Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add terminal error #2325

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 11 additions & 5 deletions pkg/controller/controllertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllertest

import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,28 +36,33 @@ func (ErrorType) GetObjectKind() schema.ObjectKind { return nil }
// DeepCopyObject implements runtime.Object.
func (ErrorType) DeepCopyObject() runtime.Object { return nil }

var _ workqueue.RateLimitingInterface = Queue{}
var _ workqueue.RateLimitingInterface = &Queue{}

// Queue implements a RateLimiting queue as a non-ratelimited queue for testing.
// This helps testing by having functions that use a RateLimiting queue synchronously add items to the queue.
type Queue struct {
workqueue.Interface
AddedRateLimitedLock sync.Mutex
AddedRatelimited []any
}

// AddAfter implements RateLimitingInterface.
func (q Queue) AddAfter(item interface{}, duration time.Duration) {
func (q *Queue) AddAfter(item interface{}, duration time.Duration) {
q.Add(item)
}

// AddRateLimited implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) AddRateLimited(item interface{}) {
func (q *Queue) AddRateLimited(item interface{}) {
q.AddedRateLimitedLock.Lock()
q.AddedRatelimited = append(q.AddedRatelimited, item)
q.AddedRateLimitedLock.Unlock()
q.Add(item)
}

// Forget implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) Forget(item interface{}) {}
func (q *Queue) Forget(item interface{}) {}

// NumRequeues implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) NumRequeues(item interface{}) int {
func (q *Queue) NumRequeues(item interface{}) int {
return 0
}
2 changes: 1 addition & 1 deletion pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Eventhandler", func() {
var pod *corev1.Pod
var mapper meta.RESTMapper
BeforeEach(func() {
q = controllertest.Queue{Interface: workqueue.New()}
q = &controllertest.Queue{Interface: workqueue.New()}
pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "biz", Name: "baz"},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
if errors.Is(err, reconcile.TerminalError(nil)) {
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
} else {
c.Queue.AddRateLimited(req)
}
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
Expand Down
25 changes: 24 additions & 1 deletion pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request if there is an error and continue processing items", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -372,6 +371,9 @@ var _ = Describe("controller", func() {
By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
Expect(<-reconciled).To(Equal(request))
queue.AddedRateLimitedLock.Lock()
Expect(queue.AddedRatelimited).To(Equal([]any{request}))
queue.AddedRateLimitedLock.Unlock()

By("Invoking Reconciler a second time without error")
fakeReconcile.AddResult(reconcile.Result{}, nil)
Expand All @@ -382,6 +384,27 @@ var _ = Describe("controller", func() {
Eventually(func() int { return queue.NumRequeues(request) }, 1.0).Should(Equal(0))
})

It("should not requeue a Request if there is a terminal error", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

queue.Add(request)

By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("expected error: reconcile")))
Expect(<-reconciled).To(Equal(request))
alvaroaleman marked this conversation as resolved.
Show resolved Hide resolved

queue.AddedRateLimitedLock.Lock()
Expect(queue.AddedRatelimited).To(BeEmpty())
queue.AddedRateLimitedLock.Unlock()

Expect(queue.Len()).Should(Equal(0))
})

// TODO(directxman12): we should ensure that backoff occurrs with error requeue

It("should not reset backoff until there's a non-error result", func() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/internal/controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ var (
Help: "Total number of reconciliation errors per controller",
}, []string{"controller"})

// TerminalReconcileErrors is a prometheus counter metrics which holds the total
// number of terminal errors from the Reconciler.
TerminalReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_terminal_reconcile_errors_total",
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
Help: "Total number of terminal reconciliation errors per controller",
}, []string{"controller"})

// ReconcileTime is a prometheus metric which keeps track of the duration
// of reconciliations.
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -67,6 +74,7 @@ func init() {
metrics.Registry.MustRegister(
ReconcileTotal,
ReconcileErrors,
TerminalReconcileErrors,
ReconcileTime,
WorkerCount,
ActiveWorkers,
Expand Down
32 changes: 16 additions & 16 deletions pkg/internal/source/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("Internal", func() {
set = true
},
}
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, funcs, nil)
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil)
})

Describe("EventHandler", func() {
Expand All @@ -99,38 +99,38 @@ var _ = Describe("Internal", func() {
})

It("should used Predicates to filter CreateEvents", func() {
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
set = false
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -157,37 +157,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter UpdateEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand Down Expand Up @@ -215,37 +215,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter DeleteEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
Expand Down
24 changes: 24 additions & 0 deletions pkg/reconcile/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reconcile

import (
"context"
"errors"
"time"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -100,3 +101,26 @@ var _ Reconciler = Func(nil)

// Reconcile implements Reconciler.
func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) }

// TerminalError is an error that will not be retried but still be logged
// and recorded in metrics.
func TerminalError(wrapped error) error {
return &terminalError{err: wrapped}
}

type terminalError struct {
err error
}

func (te *terminalError) Unwrap() error {
return te.err
}

func (te *terminalError) Error() string {
return "terminal error: " + te.err.Error()
}

func (te *terminalError) Is(target error) bool {
tp := &terminalError{}
return errors.As(target, &tp)
}
8 changes: 8 additions & 0 deletions pkg/reconcile/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -88,5 +89,12 @@ var _ = Describe("reconcile", func() {
Expect(actualResult).To(Equal(result))
Expect(actualErr).To(Equal(err))
})

It("should allow unwrapping inner error from terminal error", func() {
inner := apierrors.NewGone("")
terminalError := reconcile.TerminalError(inner)

Expect(apierrors.IsGone(terminalError)).To(BeTrue())
})
})
})