Skip to content

Commit

Permalink
Enhancer and HealthUtil now differentiate between fatal and transient…
Browse files Browse the repository at this point in the history
… errors (#1427)

Summary:
previously any error returned by the `Enhancer` was treated as fatal. It made sense because it would act deterministically on a prepared set of resources. However, since #1319 enhancer also uses the discovery interface to determine whether or not the given resource is namespaced. These API requests may fail and must be retried. With this fix, `DefaultEnhancer` clearly marks certain errors as fatal.

Fixes #1413

Signed-off-by: Aleksey Dukhovniy <alex.dukhovniy@googlemail.com>
  • Loading branch information
Aleksey Dukhovniy committed Mar 18, 2020
1 parent 5a3ad3a commit b5f1f29
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/instance/instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (r *Reconciler) getInstance(request ctrl.Request) (instance *kudov1beta1.In
err = r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
// Error reading the object - requeue the request.
log.Printf("InstanceController: Error getting instance \"%v\": %v",
log.Printf("InstanceController: Error getting instance %v: %v",
request.NamespacedName,
err)
return nil, err
Expand Down
27 changes: 9 additions & 18 deletions pkg/engine/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,13 @@ import (
"k8s.io/kubectl/pkg/polymorphichelpers"

kudov1beta1 "github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine"
)

// IsTerminallyFailed returns whether an object is in a terminal failed state and has no chance to reach healthy
func IsTerminallyFailed(obj runtime.Object) (bool, string) {
if obj == nil {
return false, ""
}

switch obj := obj.(type) {
case *batchv1.Job:
return isJobTerminallyFailed(obj)
default:
return false, ""
}
}

func isJobTerminallyFailed(job *batchv1.Job) (bool, string) {
for _, c := range job.Status.Conditions {
if c.Type == batchv1.JobFailed && c.Status == corev1.ConditionTrue {
log.Printf("HealthUtil: Job \"%v\" has failed: %s", job.Name, c.Message)
log.Printf("HealthUtil: Job %q has failed: %s", job.Name, c.Message)
return true, c.Message
}
}
Expand Down Expand Up @@ -80,10 +67,14 @@ func IsHealthy(obj runtime.Object) error {

if obj.Status.Succeeded == int32(1) {
// Done!
log.Printf("HealthUtil: Job \"%v\" is marked healthy", obj.Name)
log.Printf("HealthUtil: Job %q is marked healthy", obj.Name)
return nil
}
return fmt.Errorf("job \"%v\" still running or failed", obj.Name)
if terminal, msg := isJobTerminallyFailed(obj); terminal {
return fmt.Errorf("%wHealthUtil: Job %q has failed terminally: %s", engine.ErrFatalExecution, obj.Name, msg)
}

return fmt.Errorf("job %q still running or failed", obj.Name)
case *kudov1beta1.Instance:
log.Printf("HealthUtil: Instance %v is in state %v", obj.Name, obj.Status.AggregatedStatus.Status)

Expand All @@ -96,7 +87,7 @@ func IsHealthy(obj runtime.Object) error {
if obj.Status.Phase == corev1.PodRunning {
return nil
}
return fmt.Errorf("pod \"%v\" is not running yet: %s", obj.Name, obj.Status.Phase)
return fmt.Errorf("pod %q is not running yet: %s", obj.Name, obj.Status.Phase)

// unless we build logic for what a healthy object is, assume it's healthy when created.
default:
Expand Down
13 changes: 7 additions & 6 deletions pkg/engine/renderer/enhancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/kudobuilder/kudo/pkg/engine"
"github.com/kudobuilder/kudo/pkg/engine/resource"
"github.com/kudobuilder/kudo/pkg/util/kudo"
)
Expand All @@ -36,19 +37,19 @@ func (de *DefaultEnhancer) Apply(templates map[string]string, metadata Metadata)
for name, v := range templates {
parsed, err := YamlToObject(v)
if err != nil {
return nil, fmt.Errorf("parsing YAML from %s failed: %v", name, err)
return nil, fmt.Errorf("%wparsing YAML from %s: %v", engine.ErrFatalExecution, name, err)
}
for _, obj := range parsed {
unstructMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, fmt.Errorf("converting to unstructured failed: %v", err)
return nil, fmt.Errorf("%wconverting to unstructured failed: %v", engine.ErrFatalExecution, err)
}

if err = addLabels(unstructMap, metadata); err != nil {
return nil, fmt.Errorf("adding labels on parsed object: %v", err)
return nil, fmt.Errorf("%wadding labels on parsed object: %v", engine.ErrFatalExecution, err)
}
if err = addAnnotations(unstructMap, metadata); err != nil {
return nil, fmt.Errorf("adding annotations on parsed object %s: %v", obj.GetObjectKind(), err)
return nil, fmt.Errorf("%wadding annotations on parsed object %s: %v", engine.ErrFatalExecution, obj.GetObjectKind(), err)
}

objUnstructured := &unstructured.Unstructured{Object: unstructMap}
Expand All @@ -65,7 +66,7 @@ func (de *DefaultEnhancer) Apply(templates map[string]string, metadata Metadata)
if isNamespaced {
objUnstructured.SetNamespace(metadata.InstanceNamespace)
if err = setControllerReference(metadata.ResourcesOwner, objUnstructured, de.Scheme); err != nil {
return nil, fmt.Errorf("setting controller reference on parsed object %s: %v", obj.GetObjectKind(), err)
return nil, fmt.Errorf("%wsetting controller reference on parsed object %s: %v", engine.ErrFatalExecution, obj.GetObjectKind(), err)
}
}

Expand All @@ -75,7 +76,7 @@ func (de *DefaultEnhancer) Apply(templates map[string]string, metadata Metadata)
// that doesn't belong to the specific object type.
err = runtime.DefaultUnstructuredConverter.FromUnstructured(objUnstructured.UnstructuredContent(), obj)
if err != nil {
return nil, fmt.Errorf("converting from unstructured failed: %v", err)
return nil, fmt.Errorf("%wconverting from unstructured failed: %v", engine.ErrFatalExecution, err)
}
objs = append(objs, obj)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/engine/task/render.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package task

import (
"errors"
"fmt"

"k8s.io/apimachinery/pkg/runtime"

"github.com/kudobuilder/kudo/pkg/engine"
"github.com/kudobuilder/kudo/pkg/engine/renderer"
)

Expand Down Expand Up @@ -45,5 +47,13 @@ func render(resourceNames []string, ctx Context) (map[string]string, error) {
// returns a slice of k8s objects.
func enhance(rendered map[string]string, meta renderer.Metadata, enhancer renderer.Enhancer) ([]runtime.Object, error) {
enhanced, err := enhancer.Apply(rendered, meta)

switch {
case errors.Is(err, engine.ErrFatalExecution):
return nil, fatalExecutionError(err, taskEnhancementError, meta)
case err != nil:
return nil, err
}

return enhanced, err
}
18 changes: 5 additions & 13 deletions pkg/engine/task/task_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package task
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"

Expand All @@ -18,6 +19,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine"
"github.com/kudobuilder/kudo/pkg/engine/health"
"github.com/kudobuilder/kudo/pkg/engine/resource"
"github.com/kudobuilder/kudo/pkg/util/kudo"
Expand Down Expand Up @@ -46,7 +48,7 @@ func (at ApplyTask) Run(ctx Context) (bool, error) {
// 2. - Enhance them with metadata -
enhanced, err := enhance(rendered, ctx.Meta, ctx.Enhancer)
if err != nil {
return false, fatalExecutionError(err, taskEnhancementError, ctx.Meta)
return false, err
}

// 3. - Apply them using the client -
Expand All @@ -58,8 +60,8 @@ func (at ApplyTask) Run(ctx Context) (bool, error) {
// 4. - Check health for all resources -
err = isHealthy(applied)
if err != nil {
if fatal := isTerminallyFailed(applied); fatal != nil {
return false, fatalExecutionError(fatal, failedTerminalState, ctx.Meta)
if errors.Is(err, engine.ErrFatalExecution) {
return false, fatalExecutionError(err, failedTerminalState, ctx.Meta)
}
// an error during a health check is not treated task execution error
log.Printf("TaskExecution: %v", err)
Expand Down Expand Up @@ -239,16 +241,6 @@ func isHealthy(ro []runtime.Object) error {
return nil
}

func isTerminallyFailed(ro []runtime.Object) error {
for _, r := range ro {
if failed, msg := health.IsTerminallyFailed(r); failed {
key, _ := client.ObjectKeyFromObject(r)
return fmt.Errorf("object %s/%s has failed: %s", key.Namespace, key.Name, msg)
}
}
return nil
}

// copy from k8s.io/kubectl@v0.16.6/pkg/util/apply.go, with adjustments
// GetOriginalConfiguration retrieves the original configuration of the object
// from the annotation.
Expand Down
54 changes: 40 additions & 14 deletions pkg/engine/task/task_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestApplyTask_Run(t *testing.T) {
},
},
{
name: "fails when a kustomizing error occurs",
name: "fails with a fatal error when a fatal kustomizing error occurs",
task: ApplyTask{
Name: "task",
Resources: []string{"pod"},
Expand All @@ -86,7 +86,24 @@ func TestApplyTask_Run(t *testing.T) {
ctx: Context{
Client: fake.NewFakeClientWithScheme(scheme.Scheme),
Discovery: utils.FakeDiscoveryClient(),
Enhancer: &errorEnhancer{},
Enhancer: &fatalErrorEnhancer{},
Meta: meta,
Templates: map[string]string{"pod": resourceAsString(pod("pod1", "default"))},
},
},
{
name: "fails with a transient error when a normal kustomizing error occurs",
task: ApplyTask{
Name: "task",
Resources: []string{"pod"},
},
done: false,
wantErr: true,
fatal: false,
ctx: Context{
Client: fake.NewFakeClientWithScheme(scheme.Scheme),
Discovery: utils.FakeDiscoveryClient(),
Enhancer: &transientErrorEnhancer{},
Meta: meta,
Templates: map[string]string{"pod": resourceAsString(pod("pod1", "default"))},
},
Expand Down Expand Up @@ -126,15 +143,18 @@ func TestApplyTask_Run(t *testing.T) {
}

for _, tt := range tests {
got, err := tt.task.Run(tt.ctx)
assert.True(t, tt.done == got, fmt.Sprintf("%s failed: want = %t, wantErr = %v", tt.name, got, err))
if tt.wantErr {
assert.True(t, errors.Is(err, engine.ErrFatalExecution) == tt.fatal)
assert.Error(t, err)
}
if !tt.wantErr {
assert.NoError(t, err)
}
tt := tt
t.Run(tt.name, func(t *testing.T) {
got, err := tt.task.Run(tt.ctx)
assert.True(t, tt.done == got, fmt.Sprintf("%s failed: want = %t, wantErr = %v", tt.name, got, err))
if tt.wantErr {
assert.True(t, errors.Is(err, engine.ErrFatalExecution) == tt.fatal)
assert.Error(t, err)
}
if !tt.wantErr {
assert.NoError(t, err)
}
})
}
}

Expand Down Expand Up @@ -190,8 +210,14 @@ func (k *testEnhancer) Apply(templates map[string]string, metadata renderer.Meta
return result, nil
}

type errorEnhancer struct{}
type fatalErrorEnhancer struct{}

func (k *fatalErrorEnhancer) Apply(templates map[string]string, metadata renderer.Metadata) ([]runtime.Object, error) {
return nil, fmt.Errorf("%wsomething fatally bad happens every time", engine.ErrFatalExecution)
}

type transientErrorEnhancer struct{}

func (k *errorEnhancer) Apply(templates map[string]string, metadata renderer.Metadata) ([]runtime.Object, error) {
return nil, errors.New("always error")
func (k *transientErrorEnhancer) Apply(templates map[string]string, metadata renderer.Metadata) ([]runtime.Object, error) {
return nil, fmt.Errorf("something transiently bad happens every time")
}
2 changes: 1 addition & 1 deletion pkg/engine/task/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (dt DeleteTask) Run(ctx Context) (bool, error) {
// 2. - Enhance them with metadata -
enhanced, err := enhance(rendered, ctx.Meta, ctx.Enhancer)
if err != nil {
return false, fatalExecutionError(err, taskEnhancementError, ctx.Meta)
return false, err
}

// 3. - Delete them using the client -
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/task/task_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestDeleteTask_Run(t *testing.T) {
fatal: true,
ctx: Context{
Client: fake.NewFakeClientWithScheme(scheme.Scheme),
Enhancer: &errorEnhancer{},
Enhancer: &fatalErrorEnhancer{},
Meta: meta,
Templates: map[string]string{"pod": resourceAsString(pod("pod1", "default"))},
},
Expand Down
9 changes: 4 additions & 5 deletions pkg/engine/task/task_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
// 5. - Enhance pod with metadata
podObj, err := enhance(map[string]string{"pipe-pod.yaml": podYaml}, ctx.Meta, ctx.Enhancer)
if err != nil {
return false, fatalExecutionError(err, taskEnhancementError, ctx.Meta)
return false, err
}

// 6. - Apply pod using the client -
Expand All @@ -97,9 +97,8 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
// once the pod is Ready, it means that its initContainer finished successfully and we can copy
// out the generated files. An error during a health check is not treated as task execution error
if err != nil {
if fatal := isTerminallyFailed(podObj); fatal != nil {
return false, fatalExecutionError(fatal, failedTerminalState, ctx.Meta)
}
// our pod can not fail terminally, so we treat it as a transient error
log.Printf("TaskExecution: %v", err)
return false, nil
}

Expand All @@ -126,7 +125,7 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
// 10. - Enhance artifacts -
artObj, err := enhance(artStr, ctx.Meta, ctx.Enhancer)
if err != nil {
return false, fatalExecutionError(err, taskEnhancementError, ctx.Meta)
return false, err
}

// 11. - Apply artifacts using the client -
Expand Down

0 comments on commit b5f1f29

Please sign in to comment.