Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ApplyTask patch behavior #1332

Merged
merged 16 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 206 additions & 31 deletions pkg/engine/task/task_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@ package task

import (
"context"
"encoding/json"
"fmt"
"log"

apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
apijson "k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/discovery"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine/health"
"github.com/kudobuilder/kudo/pkg/engine/resource"
"github.com/kudobuilder/kudo/pkg/util/kudo"
)

// ApplyTask will apply a set of given resources to the cluster. See Run method for more details.
Expand All @@ -24,6 +29,10 @@ type ApplyTask struct {
Resources []string
}

var (
metadataAccessor = meta.NewAccessor()
)

// Run method for the ApplyTask. Given the task context, it renders the templates using context parameters
// creates runtime objects and enhances them, and applies them using the controller client. Finally,
// resources are checked for health.
Expand All @@ -41,7 +50,7 @@ func (at ApplyTask) Run(ctx Context) (bool, error) {
}

// 3. - Apply them using the client -
applied, err := apply(enhanced, ctx.Client, ctx.Discovery)
applied, err := applyResources(enhanced, ctx)
if err != nil {
return false, err
}
Expand All @@ -57,24 +66,52 @@ func (at ApplyTask) Run(ctx Context) (bool, error) {
return true, nil
}

func addLastAppliedConfigAnnotation(r runtime.Object) error {
// Serialize object
rSer, err := json.Marshal(r)
if err != nil {
return fmt.Errorf("failed to marshal obj: %v", err)
}

annotations, err := metadataAccessor.Annotations(r)
if err != nil {
return fmt.Errorf("failed to access annotations: %v", err)
}
if annotations == nil {
annotations = map[string]string{}
}

// Set serialized object as an annotation on itself
annotations[kudo.LastAppliedConfigAnnotation] = string(rSer)
if err := metadataAccessor.SetAnnotations(r, annotations); err != nil {
return err
}

return nil
}

// apply method takes a slice of k8s object and applies them using passed client. If an object
// doesn't exist it will be created. An already existing object will be patched.
func apply(rr []runtime.Object, c client.Client, di discovery.DiscoveryInterface) ([]runtime.Object, error) {
func applyResources(rr []runtime.Object, ctx Context) ([]runtime.Object, error) {
applied := make([]runtime.Object, 0)

for _, r := range rr {
existing := r.DeepCopyObject()

key, err := resource.ObjectKeyFromObject(r, di)
key, err := resource.ObjectKeyFromObject(r, ctx.Discovery)
if err != nil {
return nil, err
}

err = c.Get(context.TODO(), key, existing)
err = ctx.Client.Get(context.TODO(), key, existing)

switch {
case apierrors.IsNotFound(err): // create resource if it doesn't exist
err = c.Create(context.TODO(), r)
if err := addLastAppliedConfigAnnotation(r); err != nil {
return nil, fmt.Errorf("failed to add last applied config annotation to %s/%s: %w", key.Namespace, key.Name, err)
}

err = ctx.Client.Create(context.TODO(), r)
// c.Create always overrides the input, in this case, the object that had previously set GVK loses it (at least for integration tests)
// and this was causing problems in health module
// with error failed to convert *unstructured.Unstructured to *v1.Deployment: Object 'Kind' is missing in 'unstructured object has no kind'
Expand All @@ -88,9 +125,9 @@ func apply(rr []runtime.Object, c client.Client, di discovery.DiscoveryInterface
case err != nil: // raise any error other than StatusReasonNotFound
return nil, err
default: // update existing resource
err := patch(r, c)
err := patchResource(r, existing, ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to patch %s/%s: %w", key.Namespace, key.Name, err)
}
applied = append(applied, r)
}
Expand All @@ -99,42 +136,96 @@ func apply(rr []runtime.Object, c client.Client, di discovery.DiscoveryInterface
return applied, nil
}

// patch calls update method on kubernetes client to make sure the current resource reflects what is on server
//
// an obvious optimization here would be to not patch when objects are the same, however that is not easy
// kubernetes native objects might be a problem because we cannot just compare the spec as the spec might have extra fields
// and those extra fields are set by some kubernetes component
// because of that for now we just try to apply the patch every time
// it mutates the object passed in to be consistent with the kubernetes client behavior
func patch(newObj runtime.Object, c client.Client) error {
newObjJSON, _ := apijson.Marshal(newObj)
key, _ := client.ObjectKeyFromObject(newObj)
_, isUnstructured := newObj.(runtime.Unstructured)
_, isCRD := newObj.(*apiextv1beta1.CustomResourceDefinition)
func patchResource(modifiedObj, currentObj runtime.Object, ctx Context) error {

if isUnstructured || isCRD || isKudoType(newObj) {
// strategic merge patch is not supported for these types, falling back to merge patch
err := c.Patch(context.TODO(), newObj, client.ConstantPatch(types.MergePatchType, newObjJSON))
if err != nil {
return fmt.Errorf("failed to apply merge patch to object %s/%s: %w", key.Namespace, key.Name, err)
}
// Serialize current configuration
current, err := json.Marshal(currentObj)
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to marshal current %v", err), taskRenderingError, ctx.Meta)
}

// Get previous configuration from currentObjs annotation
original, err := getOriginalConfiguration(currentObj)
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to get original configuration %v", err), taskRenderingError, ctx.Meta)
}

// Get new (modified) configuration
modified, err := getModifiedConfiguration(modifiedObj, true, unstructured.UnstructuredJSONScheme)
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to get modified config %v", err), taskRenderingError, ctx.Meta)
}

// Create the actual patchResource
var patchData []byte
var patchType types.PatchType
if useSimpleThreeWayMerge(modifiedObj) {
patchType = types.MergePatchType
patchData, err = simpleThreeWayMergePatch(original, modified, current)
} else {
err := c.Patch(context.TODO(), newObj, client.ConstantPatch(types.StrategicMergePatchType, newObjJSON))
if err != nil {
return fmt.Errorf("failed to apply StrategicMergePatch to object %s/%s: %w", key.Namespace, key.Name, err)
}
patchType = types.StrategicMergePatchType
patchData, err = strategicThreeWayMergePatch(modifiedObj, original, modified, current)
}

if err != nil {
// TODO: We could try to delete/create here, but that would be different behavior from before
return fatalExecutionError(fmt.Errorf("failed to create patch: %v", err), taskRenderingError, ctx.Meta)
}

// Execute the patchResource
err = ctx.Client.Patch(context.TODO(), modifiedObj, client.ConstantPatch(patchType, patchData))
if err != nil {
return fmt.Errorf("failed to execute patch: %v", err)
}
return nil
}

func useSimpleThreeWayMerge(newObj runtime.Object) bool {
_, isUnstructured := newObj.(runtime.Unstructured)
_, isCRD := newObj.(*apiextv1beta1.CustomResourceDefinition)
return isUnstructured || isCRD || isKudoType(newObj)
}

func isKudoType(object runtime.Object) bool {
_, isOperator := object.(*v1beta1.OperatorVersion)
_, isOperatorVersion := object.(*v1beta1.Operator)
_, isInstance := object.(*v1beta1.Instance)
return isOperator || isOperatorVersion || isInstance
}

func simpleThreeWayMergePatch(original, modified, current []byte) ([]byte, error) {
preconditions := []mergepatch.PreconditionFunc{
mergepatch.RequireKeyUnchanged("apiVersion"),
mergepatch.RequireKeyUnchanged("kind"),
mergepatch.RequireMetadataKeyUnchanged("name"),
}

patchData, err := jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
if err != nil {
if mergepatch.IsPreconditionFailed(err) {
return nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
}
return nil, fmt.Errorf(" failed to create json merge patch: %v", err)
}

return patchData, nil
}

func strategicThreeWayMergePatch(r runtime.Object, original, modified, current []byte) ([]byte, error) {
// Create the patchResource
patchMeta, err := strategicpatch.NewPatchMetaFromStruct(r)
if err != nil {
return nil, fmt.Errorf("failed to create patch meta %v", err)
}

patchData, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, patchMeta, true)
if err != nil {
return nil, fmt.Errorf("failed to create patch data %v", err)
}

return patchData, nil
}

func isHealthy(ro []runtime.Object) error {
for _, r := range ro {
err := health.IsHealthy(r)
Expand All @@ -145,3 +236,87 @@ func isHealthy(ro []runtime.Object) error {
}
return nil
}

// copy from k8s.io/kubectl@v0.16.6/pkg/util/apply.go, with adjustments
// GetOriginalConfiguration retrieves the original configuration of the object
// from the annotation.
// returns an error if the annotation is nil or invalid JSON
func getOriginalConfiguration(obj runtime.Object) ([]byte, error) {
annots, err := metadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}

if annots == nil {
return nil, nil
}

original, ok := annots[kudo.LastAppliedConfigAnnotation]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got stuck at this line. Are we backwards compatible after this is merged? In a scenario where someone re-runs a deploy plan for an existing operator (after updating KUDO), all plan resources will be patched, correct? However, there is no LastAppliedConfigAnnotation so it will return nill, nil and the subsequent merges will... fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a problem. I'll try to figure something out about this...

if !ok {
return nil, fmt.Errorf("LastAppliedConfigAnnotation is not set")
}

originalBytes := []byte(original)

originalMap := map[string]interface{}{}
if len(original) > 0 {
if err := json.Unmarshal(originalBytes, &originalMap); err != nil {
return nil, fmt.Errorf("LastAppliedConfigAnnotation is invalid JSON")
}
}

return []byte(original), nil
}

// copy from k8s.io/kubectl@v0.16.6/pkg/util/apply.go, but with different annotation
// GetModifiedConfiguration retrieves the modified configuration of the object.
// If annotate is true, it embeds the result as an annotation in the modified
// configuration. If an object was read from the command input, it will use that
// version of the object. Otherwise, it will use the version from the server.
func getModifiedConfiguration(obj runtime.Object, annotate bool, codec runtime.Encoder) ([]byte, error) {
// First serialize the object without the annotation to prevent recursion,
// then add that serialization to it as the annotation and serialize it again.
var modified []byte

// Otherwise, use the server side version of the object.
// Get the current annotations from the object.
annots, err := metadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}

if annots == nil {
annots = map[string]string{}
}

original := annots[kudo.LastAppliedConfigAnnotation]
delete(annots, kudo.LastAppliedConfigAnnotation)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

modified, err = runtime.Encode(codec, obj)
if err != nil {
return nil, err
}

if annotate {
annots[kudo.LastAppliedConfigAnnotation] = string(modified)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

modified, err = runtime.Encode(codec, obj)
if err != nil {
return nil, err
}
}

// Restore the object to its original condition.
annots[kudo.LastAppliedConfigAnnotation] = original
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

return modified, nil
}
4 changes: 2 additions & 2 deletions pkg/engine/task/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dt DeleteTask) Run(ctx Context) (bool, error) {
}

// 3. - Delete them using the client -
err = delete(enhanced, ctx.Client)
err = deleteResource(enhanced, ctx.Client)
if err != nil {
return false, err
}
Expand All @@ -39,7 +39,7 @@ func (dt DeleteTask) Run(ctx Context) (bool, error) {
return true, nil
}

func delete(ro []runtime.Object, c client.Client) error {
func deleteResource(ro []runtime.Object, c client.Client) error {
for _, r := range ro {
err := c.Delete(context.TODO(), r, client.PropagationPolicy(metav1.DeletePropagationForeground))
if !apierrors.IsNotFound(err) && err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/task/task_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
}

// 6. - Apply pod using the client -
podObj, err = apply(podObj, ctx.Client, ctx.Discovery)
podObj, err = applyResources(podObj, ctx)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -127,14 +127,14 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
}

// 11. - Apply artifacts using the client -
_, err = apply(artObj, ctx.Client, ctx.Discovery)
_, err = applyResources(artObj, ctx)
if err != nil {
return false, err
}

// 12. - Delete pipe pod -
log.Printf("PipeTask: %s/%s deleting pipe pod", ctx.Meta.InstanceNamespace, ctx.Meta.InstanceName)
err = delete(podObj, ctx.Client)
err = deleteResource(podObj, ctx.Client)
if err != nil {
return false, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/kudo/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ const (

// PlanUIDAnnotation is a k8s annotation key for the last time a given plan was run on the referenced object
PlanUIDAnnotation = "kudo.dev/last-plan-execution-uid"

// Last applied state for three way merges
LastAppliedConfigAnnotation = "kudo.dev/last-applied-configuration"
zen-dog marked this conversation as resolved.
Show resolved Hide resolved
)
17 changes: 17 additions & 0 deletions test/integration/remove-pod-container/00-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
spec:
replicas: 1
template:
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
- name: nginx2
image: nginx:1.7.9
ports:
- containerPort: 8080
Loading