Skip to content

Commit

Permalink
Fix ApplyTask patch behavior (#1332)
Browse files Browse the repository at this point in the history
ApplyTask now uses correct ThreeWayMerges (either plain JSON or Strategic K8s merges) to apply the task, the same way `kubectl apply` does.

It stores the last applied version of the resource in an annotation of the resource so that the merge can be done correctly on the next apply.

As already applied resources do not have this annoation, the ApplyTask can not calculate the correct patch and fails, this is the breaking change. 

Signed-off-by: Andreas Neumann <aneumann@mesosphere.com>
  • Loading branch information
ANeumann82 committed Feb 28, 2020
1 parent 42bf583 commit 8afbb97
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 36 deletions.
237 changes: 206 additions & 31 deletions pkg/engine/task/task_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@ package task

import (
"context"
"encoding/json"
"fmt"
"log"

apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
apijson "k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/discovery"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine/health"
"github.com/kudobuilder/kudo/pkg/engine/resource"
"github.com/kudobuilder/kudo/pkg/util/kudo"
)

// ApplyTask will apply a set of given resources to the cluster. See Run method for more details.
Expand All @@ -24,6 +29,10 @@ type ApplyTask struct {
Resources []string
}

var (
metadataAccessor = meta.NewAccessor()
)

// Run method for the ApplyTask. Given the task context, it renders the templates using context parameters
// creates runtime objects and enhances them, and applies them using the controller client. Finally,
// resources are checked for health.
Expand All @@ -41,7 +50,7 @@ func (at ApplyTask) Run(ctx Context) (bool, error) {
}

// 3. - Apply them using the client -
applied, err := apply(enhanced, ctx.Client, ctx.Discovery)
applied, err := applyResources(enhanced, ctx)
if err != nil {
return false, err
}
Expand All @@ -57,24 +66,52 @@ func (at ApplyTask) Run(ctx Context) (bool, error) {
return true, nil
}

func addLastAppliedConfigAnnotation(r runtime.Object) error {
// Serialize object
rSer, err := json.Marshal(r)
if err != nil {
return fmt.Errorf("failed to marshal obj: %v", err)
}

annotations, err := metadataAccessor.Annotations(r)
if err != nil {
return fmt.Errorf("failed to access annotations: %v", err)
}
if annotations == nil {
annotations = map[string]string{}
}

// Set serialized object as an annotation on itself
annotations[kudo.LastAppliedConfigAnnotation] = string(rSer)
if err := metadataAccessor.SetAnnotations(r, annotations); err != nil {
return err
}

return nil
}

// apply method takes a slice of k8s object and applies them using passed client. If an object
// doesn't exist it will be created. An already existing object will be patched.
func apply(rr []runtime.Object, c client.Client, di discovery.DiscoveryInterface) ([]runtime.Object, error) {
func applyResources(rr []runtime.Object, ctx Context) ([]runtime.Object, error) {
applied := make([]runtime.Object, 0)

for _, r := range rr {
existing := r.DeepCopyObject()

key, err := resource.ObjectKeyFromObject(r, di)
key, err := resource.ObjectKeyFromObject(r, ctx.Discovery)
if err != nil {
return nil, err
}

err = c.Get(context.TODO(), key, existing)
err = ctx.Client.Get(context.TODO(), key, existing)

switch {
case apierrors.IsNotFound(err): // create resource if it doesn't exist
err = c.Create(context.TODO(), r)
if err := addLastAppliedConfigAnnotation(r); err != nil {
return nil, fmt.Errorf("failed to add last applied config annotation to %s/%s: %w", key.Namespace, key.Name, err)
}

err = ctx.Client.Create(context.TODO(), r)
// c.Create always overrides the input, in this case, the object that had previously set GVK loses it (at least for integration tests)
// and this was causing problems in health module
// with error failed to convert *unstructured.Unstructured to *v1.Deployment: Object 'Kind' is missing in 'unstructured object has no kind'
Expand All @@ -88,9 +125,9 @@ func apply(rr []runtime.Object, c client.Client, di discovery.DiscoveryInterface
case err != nil: // raise any error other than StatusReasonNotFound
return nil, err
default: // update existing resource
err := patch(r, c)
err := patchResource(r, existing, ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to patch %s/%s: %w", key.Namespace, key.Name, err)
}
applied = append(applied, r)
}
Expand All @@ -99,42 +136,96 @@ func apply(rr []runtime.Object, c client.Client, di discovery.DiscoveryInterface
return applied, nil
}

// patch calls update method on kubernetes client to make sure the current resource reflects what is on server
//
// an obvious optimization here would be to not patch when objects are the same, however that is not easy
// kubernetes native objects might be a problem because we cannot just compare the spec as the spec might have extra fields
// and those extra fields are set by some kubernetes component
// because of that for now we just try to apply the patch every time
// it mutates the object passed in to be consistent with the kubernetes client behavior
func patch(newObj runtime.Object, c client.Client) error {
newObjJSON, _ := apijson.Marshal(newObj)
key, _ := client.ObjectKeyFromObject(newObj)
_, isUnstructured := newObj.(runtime.Unstructured)
_, isCRD := newObj.(*apiextv1beta1.CustomResourceDefinition)
func patchResource(modifiedObj, currentObj runtime.Object, ctx Context) error {

if isUnstructured || isCRD || isKudoType(newObj) {
// strategic merge patch is not supported for these types, falling back to merge patch
err := c.Patch(context.TODO(), newObj, client.ConstantPatch(types.MergePatchType, newObjJSON))
if err != nil {
return fmt.Errorf("failed to apply merge patch to object %s/%s: %w", key.Namespace, key.Name, err)
}
// Serialize current configuration
current, err := json.Marshal(currentObj)
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to marshal current %v", err), taskRenderingError, ctx.Meta)
}

// Get previous configuration from currentObjs annotation
original, err := getOriginalConfiguration(currentObj)
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to get original configuration %v", err), taskRenderingError, ctx.Meta)
}

// Get new (modified) configuration
modified, err := getModifiedConfiguration(modifiedObj, true, unstructured.UnstructuredJSONScheme)
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to get modified config %v", err), taskRenderingError, ctx.Meta)
}

// Create the actual patchResource
var patchData []byte
var patchType types.PatchType
if useSimpleThreeWayMerge(modifiedObj) {
patchType = types.MergePatchType
patchData, err = simpleThreeWayMergePatch(original, modified, current)
} else {
err := c.Patch(context.TODO(), newObj, client.ConstantPatch(types.StrategicMergePatchType, newObjJSON))
if err != nil {
return fmt.Errorf("failed to apply StrategicMergePatch to object %s/%s: %w", key.Namespace, key.Name, err)
}
patchType = types.StrategicMergePatchType
patchData, err = strategicThreeWayMergePatch(modifiedObj, original, modified, current)
}

if err != nil {
// TODO: We could try to delete/create here, but that would be different behavior from before
return fatalExecutionError(fmt.Errorf("failed to create patch: %v", err), taskRenderingError, ctx.Meta)
}

// Execute the patchResource
err = ctx.Client.Patch(context.TODO(), modifiedObj, client.ConstantPatch(patchType, patchData))
if err != nil {
return fmt.Errorf("failed to execute patch: %v", err)
}
return nil
}

func useSimpleThreeWayMerge(newObj runtime.Object) bool {
_, isUnstructured := newObj.(runtime.Unstructured)
_, isCRD := newObj.(*apiextv1beta1.CustomResourceDefinition)
return isUnstructured || isCRD || isKudoType(newObj)
}

func isKudoType(object runtime.Object) bool {
_, isOperator := object.(*v1beta1.OperatorVersion)
_, isOperatorVersion := object.(*v1beta1.Operator)
_, isInstance := object.(*v1beta1.Instance)
return isOperator || isOperatorVersion || isInstance
}

func simpleThreeWayMergePatch(original, modified, current []byte) ([]byte, error) {
preconditions := []mergepatch.PreconditionFunc{
mergepatch.RequireKeyUnchanged("apiVersion"),
mergepatch.RequireKeyUnchanged("kind"),
mergepatch.RequireMetadataKeyUnchanged("name"),
}

patchData, err := jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
if err != nil {
if mergepatch.IsPreconditionFailed(err) {
return nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
}
return nil, fmt.Errorf(" failed to create json merge patch: %v", err)
}

return patchData, nil
}

func strategicThreeWayMergePatch(r runtime.Object, original, modified, current []byte) ([]byte, error) {
// Create the patchResource
patchMeta, err := strategicpatch.NewPatchMetaFromStruct(r)
if err != nil {
return nil, fmt.Errorf("failed to create patch meta %v", err)
}

patchData, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, patchMeta, true)
if err != nil {
return nil, fmt.Errorf("failed to create patch data %v", err)
}

return patchData, nil
}

func isHealthy(ro []runtime.Object) error {
for _, r := range ro {
err := health.IsHealthy(r)
Expand All @@ -145,3 +236,87 @@ func isHealthy(ro []runtime.Object) error {
}
return nil
}

// copy from k8s.io/kubectl@v0.16.6/pkg/util/apply.go, with adjustments
// GetOriginalConfiguration retrieves the original configuration of the object
// from the annotation.
// returns an error if the annotation is nil or invalid JSON
func getOriginalConfiguration(obj runtime.Object) ([]byte, error) {
annots, err := metadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}

if annots == nil {
return nil, nil
}

original, ok := annots[kudo.LastAppliedConfigAnnotation]
if !ok {
return nil, fmt.Errorf("LastAppliedConfigAnnotation is not set")
}

originalBytes := []byte(original)

originalMap := map[string]interface{}{}
if len(original) > 0 {
if err := json.Unmarshal(originalBytes, &originalMap); err != nil {
return nil, fmt.Errorf("LastAppliedConfigAnnotation is invalid JSON")
}
}

return []byte(original), nil
}

// copy from k8s.io/kubectl@v0.16.6/pkg/util/apply.go, but with different annotation
// GetModifiedConfiguration retrieves the modified configuration of the object.
// If annotate is true, it embeds the result as an annotation in the modified
// configuration. If an object was read from the command input, it will use that
// version of the object. Otherwise, it will use the version from the server.
func getModifiedConfiguration(obj runtime.Object, annotate bool, codec runtime.Encoder) ([]byte, error) {
// First serialize the object without the annotation to prevent recursion,
// then add that serialization to it as the annotation and serialize it again.
var modified []byte

// Otherwise, use the server side version of the object.
// Get the current annotations from the object.
annots, err := metadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}

if annots == nil {
annots = map[string]string{}
}

original := annots[kudo.LastAppliedConfigAnnotation]
delete(annots, kudo.LastAppliedConfigAnnotation)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

modified, err = runtime.Encode(codec, obj)
if err != nil {
return nil, err
}

if annotate {
annots[kudo.LastAppliedConfigAnnotation] = string(modified)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

modified, err = runtime.Encode(codec, obj)
if err != nil {
return nil, err
}
}

// Restore the object to its original condition.
annots[kudo.LastAppliedConfigAnnotation] = original
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

return modified, nil
}
4 changes: 2 additions & 2 deletions pkg/engine/task/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dt DeleteTask) Run(ctx Context) (bool, error) {
}

// 3. - Delete them using the client -
err = delete(enhanced, ctx.Client)
err = deleteResource(enhanced, ctx.Client)
if err != nil {
return false, err
}
Expand All @@ -39,7 +39,7 @@ func (dt DeleteTask) Run(ctx Context) (bool, error) {
return true, nil
}

func delete(ro []runtime.Object, c client.Client) error {
func deleteResource(ro []runtime.Object, c client.Client) error {
for _, r := range ro {
err := c.Delete(context.TODO(), r, client.PropagationPolicy(metav1.DeletePropagationForeground))
if !apierrors.IsNotFound(err) && err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/task/task_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
}

// 6. - Apply pod using the client -
podObj, err = apply(podObj, ctx.Client, ctx.Discovery)
podObj, err = applyResources(podObj, ctx)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -127,14 +127,14 @@ func (pt PipeTask) Run(ctx Context) (bool, error) {
}

// 11. - Apply artifacts using the client -
_, err = apply(artObj, ctx.Client, ctx.Discovery)
_, err = applyResources(artObj, ctx)
if err != nil {
return false, err
}

// 12. - Delete pipe pod -
log.Printf("PipeTask: %s/%s deleting pipe pod", ctx.Meta.InstanceNamespace, ctx.Meta.InstanceName)
err = delete(podObj, ctx.Client)
err = deleteResource(podObj, ctx.Client)
if err != nil {
return false, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/kudo/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ const (

// PlanUIDAnnotation is a k8s annotation key for the last time a given plan was run on the referenced object
PlanUIDAnnotation = "kudo.dev/last-plan-execution-uid"

// Last applied state for three way merges
LastAppliedConfigAnnotation = "kudo.dev/last-applied-configuration"
)
17 changes: 17 additions & 0 deletions test/integration/remove-pod-container/00-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
spec:
replicas: 1
template:
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
- name: nginx2
image: nginx:1.7.9
ports:
- containerPort: 8080
Loading

0 comments on commit 8afbb97

Please sign in to comment.