Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 location_controller uses committer #2638

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 27 additions & 34 deletions pkg/reconciler/scheduling/location/location_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ package location

import (
"context"
"encoding/json"
"fmt"
"time"

jsonpatch "github.com/evanphx/json-patch"
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand All @@ -39,11 +36,13 @@ import (
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
schedulingv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/scheduling/v1alpha1"
schedulingv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/scheduling/v1alpha1"
workloadv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/workload/v1alpha1"
schedulingv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/scheduling/v1alpha1"
workloadv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
)

const (
Expand Down Expand Up @@ -71,6 +70,7 @@ func NewController(
kcpClusterClient: kcpClusterClient,
locationLister: locationInformer.Lister(),
syncTargetLister: syncTargetInformer.Lister(),
commit: committer.NewCommitter[*Location, Patcher, *LocationSpec, *LocationStatus](kcpClusterClient.SchedulingV1alpha1().Locations()),
}

locationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -107,6 +107,13 @@ func NewController(
return c, nil
}

type Location = schedulingv1alpha1.Location
type LocationSpec = schedulingv1alpha1.LocationSpec
type LocationStatus = schedulingv1alpha1.LocationStatus
type Patcher = schedulingv1alpha1client.LocationInterface
type Resource = committer.Resource[*LocationSpec, *LocationStatus]
type CommitFunc = func(context.Context, *Resource, *Resource) error

// controller.
type controller struct {
queue workqueue.RateLimitingInterface
Expand All @@ -116,6 +123,8 @@ type controller struct {

locationLister schedulingv1alpha1listers.LocationClusterLister
syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister

commit CommitFunc
}

func (c *controller) enqueueLocation(obj interface{}) {
Expand Down Expand Up @@ -210,56 +219,40 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool {

func (c *controller) process(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
clusterName, namespace, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
logger.Error(err, "invalid key")
return nil
}

obj, err := c.locationLister.Cluster(clusterName).Get(name)
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
return nil // object deleted before we handled it
}
return err
}

old := obj
obj = obj.DeepCopy()

logger = logging.WithObject(logger, obj)
ctx = klog.NewContext(ctx, logger)

var errs []error
if err := c.reconcile(ctx, obj); err != nil {
return err
errs = append(errs, err)
}

// If the object being reconciled changed as a result, update it.
if !equality.Semantic.DeepEqual(old.Status, obj.Status) {
oldData, err := json.Marshal(schedulingv1alpha1.Location{
Status: old.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for LocationDomain %s|%s/%s: %w", clusterName, namespace, name, err)
}
// Regardless of whether reconcile returned an error or not, always try to patch status if needed. Return the
// reconciliation error at the end.

newData, err := json.Marshal(schedulingv1alpha1.Location{
ObjectMeta: metav1.ObjectMeta{
UID: old.UID,
ResourceVersion: old.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: obj.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for LocationDomain %s|%s/%s: %w", clusterName, namespace, name, err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for LocationDomain %s|%s/%s: %w", clusterName, namespace, name, err)
}
_, uerr := c.kcpClusterClient.Cluster(clusterName.Path()).SchedulingV1alpha1().Locations().Patch(ctx, obj.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return uerr
// If the object being reconciled changed as a result, update it.
oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
newResource := &Resource{ObjectMeta: obj.ObjectMeta, Spec: &obj.Spec, Status: &obj.Status}
if err := c.commit(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
}

return nil
return utilerrors.NewAggregate(errs)
}