Skip to content

Commit

Permalink
Get rid of server-side apply
Browse files Browse the repository at this point in the history
Currently the operator uses Server-Side Apply (SSA)[1] to manage its
resources. Unfortunately, this approach has some drawbacks as it
allows users to modify resources independently from the operator.

To prevent this behavior, this commit adds custom apply functions
inspired by ones from resourseapply module in library-go[2].

[1] https://kubernetes.io/docs/reference/using-api/server-side-apply/
[2] https://github.com/openshift/library-go/tree/master/pkg/operator/resource/resourceapply
  • Loading branch information
Fedosin committed Oct 25, 2021
1 parent bd3c894 commit c8cc989
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 20 deletions.
33 changes: 13 additions & 20 deletions pkg/controllers/clusteroperator_controller.go
Expand Up @@ -23,6 +23,7 @@ import (
configv1 "github.com/openshift/api/config/v1"
operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/cloudprovider"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -156,35 +157,27 @@ func (r *CloudOperatorReconciler) sync(ctx context.Context, config config.Operat
// server-side apply patch and will enforce all the conflicts
func (r *CloudOperatorReconciler) applyResources(ctx context.Context, resources []client.Object) (bool, error) {
updated := false
var err error

for _, resource := range resources {
resourceExisting := resource.DeepCopyObject().(client.Object)
err := r.Get(ctx, client.ObjectKeyFromObject(resourceExisting), resourceExisting)
if errors.IsNotFound(err) {
klog.Infof("Resource %s %q needs to be created, operator progressing...", resource.GetObjectKind().GroupVersionKind(), client.ObjectKeyFromObject(resource))
updated = true
} else if err != nil {
r.Recorder.Event(resource, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
switch t := resource.(type) {
case *appsv1.Deployment:
updated, err = ApplyDeployment(ctx, r.Client, r.Recorder, t)
case *appsv1.DaemonSet:
updated, err = ApplyDaemonSet(ctx, r.Client, r.Recorder, t)
case *corev1.ConfigMap:
updated, err = ApplyConfigMap(ctx, r.Client, r.Recorder, t)
default:
return false, fmt.Errorf("unhandled type %T", resource)
}

resourceUpdated := resource.DeepCopyObject().(client.Object)
if err := r.Patch(ctx, resourceUpdated, client.Apply, client.ForceOwnership, client.FieldOwner(clusterOperatorName)); err != nil {
klog.Errorf("Unable to apply object %s '%s': %+v", resource.GetObjectKind().GroupVersionKind(), resource.GetName(), err)
r.Recorder.Event(resourceExisting, corev1.EventTypeWarning, "Update failed", err.Error())
if err != nil {
return false, err
}
klog.V(2).Infof("Applied %s %q successfully", resource.GetObjectKind().GroupVersionKind(), client.ObjectKeyFromObject(resource))

if resourceExisting.GetGeneration() != resourceUpdated.GetGeneration() {
klog.Infof("Resource %s %q generation increased, resource updated, operator progressing...", resource.GetObjectKind().GroupVersionKind(), client.ObjectKeyFromObject(resource))
updated = true
r.Recorder.Event(resourceExisting, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
}

if err := r.watcher.Watch(ctx, resource); err != nil {
klog.Errorf("Unable to establish watch on object %s '%s': %+v", resource.GetObjectKind().GroupVersionKind(), resource.GetName(), err)
r.Recorder.Event(resourceExisting, corev1.EventTypeWarning, "Establish watch failed", err.Error())
r.Recorder.Event(resource, corev1.EventTypeWarning, "Establish watch failed", err.Error())
return false, err
}
}
Expand Down
226 changes: 226 additions & 0 deletions pkg/controllers/resourceapply.go
@@ -0,0 +1,226 @@
package controllers

import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"

coreclientv1 "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
)

const specHashAnnotation = "operator.openshift.io/spec-hash"

// SetSpecHashAnnotation computes the hash of the provided spec and sets an annotation of the
// hash on the provided ObjectMeta. This method is used internally by Apply<type> methods, and
// is exposed to support testing with fake clients that need to know the mutated form of the
// resource resulting from an Apply<type> call.
func SetSpecHashAnnotation(objMeta *metav1.ObjectMeta, spec interface{}) error {
jsonBytes, err := json.Marshal(spec)
if err != nil {
return err
}
specHash := fmt.Sprintf("%x", sha256.Sum256(jsonBytes))
if objMeta.Annotations == nil {
objMeta.Annotations = map[string]string{}
}
objMeta.Annotations[specHashAnnotation] = specHash
return nil
}

// ApplyConfigMap merges objectmeta, requires data
func ApplyConfigMap(ctx context.Context, client coreclientv1.Client, recorder record.EventRecorder, required *corev1.ConfigMap) (bool, error) {
existing := &corev1.ConfigMap{}
err := client.Get(ctx, coreclientv1.ObjectKeyFromObject(required), existing)
if apierrors.IsNotFound(err) {
requiredCopy := required.DeepCopy()
err := client.Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.ConfigMap))
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}
recorder.Event(requiredCopy, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
return true, nil
}
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}

modified := resourcemerge.BoolPtr(false)
existingCopy := existing.DeepCopy()

resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta)

caBundleInjected := required.Labels["config.openshift.io/inject-trusted-cabundle"] == "true"
_, newCABundleRequired := required.Data["ca-bundle.crt"]

var modifiedKeys []string
for existingCopyKey, existingCopyValue := range existingCopy.Data {
// if we're injecting a ca-bundle and the required isn't forcing the value, then don't use the value of existing
// to drive a diff detection. If required has set the value then we need to force the value in order to have apply
// behave predictably.
if caBundleInjected && !newCABundleRequired && existingCopyKey == "ca-bundle.crt" {
continue
}
if requiredValue, ok := required.Data[existingCopyKey]; !ok || (existingCopyValue != requiredValue) {
modifiedKeys = append(modifiedKeys, "data."+existingCopyKey)
}
}
for existingCopyKey, existingCopyBinValue := range existingCopy.BinaryData {
if requiredBinValue, ok := required.BinaryData[existingCopyKey]; !ok || !bytes.Equal(existingCopyBinValue, requiredBinValue) {
modifiedKeys = append(modifiedKeys, "binaryData."+existingCopyKey)
}
}
for requiredKey := range required.Data {
if _, ok := existingCopy.Data[requiredKey]; !ok {
modifiedKeys = append(modifiedKeys, "data."+requiredKey)
}
}
for requiredBinKey := range required.BinaryData {
if _, ok := existingCopy.BinaryData[requiredBinKey]; !ok {
modifiedKeys = append(modifiedKeys, "binaryData."+requiredBinKey)
}
}

dataSame := len(modifiedKeys) == 0
if dataSame && !*modified {
return false, nil
}
existingCopy.Data = required.Data
existingCopy.BinaryData = required.BinaryData
// if we're injecting a cabundle, and we had a previous value, and the required object isn't setting the value, then set back to the previous
if existingCABundle, existedBefore := existing.Data["ca-bundle.crt"]; caBundleInjected && existedBefore && !newCABundleRequired {
if existingCopy.Data == nil {
existingCopy.Data = map[string]string{}
}
existingCopy.Data["ca-bundle.crt"] = existingCABundle
}

// at this point we know that we're going to perform a write. We're just trying to get the object correct
toWrite := existingCopy // shallow copy so the code reads easier

err = client.Update(ctx, toWrite)
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}
recorder.Event(toWrite, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
return true, err
}

func ApplyDeployment(ctx context.Context, client coreclientv1.Client, recorder record.EventRecorder, requiredOriginal *appsv1.Deployment) (bool, error) {
required := requiredOriginal.DeepCopy()
err := SetSpecHashAnnotation(&required.ObjectMeta, required.Spec)
if err != nil {
return false, err
}

if required.Annotations == nil {
required.Annotations = map[string]string{}
}
if _, ok := required.Annotations[specHashAnnotation]; !ok {
// If the spec hash annotation is not present, the caller expects the
// pull-spec annotation to be applied.
required.Annotations["operator.openshift.io/pull-spec"] = required.Spec.Template.Spec.Containers[0].Image
}

existing := &appsv1.Deployment{}
err = client.Get(ctx, coreclientv1.ObjectKeyFromObject(required), existing)
if apierrors.IsNotFound(err) {
err := client.Create(ctx, required)
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}
recorder.Event(required, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
return true, nil
}
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}

modified := resourcemerge.BoolPtr(false)
existingCopy := existing.DeepCopy()

resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta)
if !*modified {
return false, nil
}

// at this point we know that we're going to perform a write. We're just trying to get the object correct
toWrite := existingCopy // shallow copy so the code reads easier
toWrite.Spec = *required.Spec.DeepCopy()

err = client.Update(ctx, toWrite)
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}
recorder.Event(required, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
return true, nil
}

func ApplyDaemonSet(ctx context.Context, client coreclientv1.Client, recorder record.EventRecorder, requiredOriginal *appsv1.DaemonSet) (bool, error) {

required := requiredOriginal.DeepCopy()
err := SetSpecHashAnnotation(&required.ObjectMeta, required.Spec)
if err != nil {
return false, err
}

if required.Annotations == nil {
required.Annotations = map[string]string{}
}
if _, ok := required.Annotations[specHashAnnotation]; !ok {
// If the spec hash annotation is not present, the caller expects the
// pull-spec annotation to be applied.
required.Annotations["operator.openshift.io/pull-spec"] = required.Spec.Template.Spec.Containers[0].Image
}

existing := &appsv1.DaemonSet{}
err = client.Get(ctx, coreclientv1.ObjectKeyFromObject(required), existing)
if apierrors.IsNotFound(err) {
err = client.Create(ctx, required)
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}
recorder.Event(required, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
return true, nil
}
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}

modified := resourcemerge.BoolPtr(false)
existingCopy := existing.DeepCopy()

resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta)
if !*modified {
return false, nil
}

// at this point we know that we're going to perform a write. We're just trying to get the object correct
toWrite := existingCopy // shallow copy so the code reads easier
toWrite.Spec = *required.Spec.DeepCopy()

err = client.Update(ctx, toWrite)
if err != nil {
recorder.Event(required, corev1.EventTypeWarning, "Update failed", err.Error())
return false, err
}
recorder.Event(required, corev1.EventTypeNormal, "Updated successfully", "Resource was successfully updated")
return true, nil
}

0 comments on commit c8cc989

Please sign in to comment.