Skip to content

Commit

Permalink
appliedmanifestwork eviction (open-cluster-management-io#190)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey authored and xuezhaojun committed Mar 21, 2023
1 parent 76c77a0 commit 07d7ec4
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 262 deletions.
4 changes: 4 additions & 0 deletions deploy/spoke/appliedmanifestworks.crd.yaml
Expand Up @@ -72,6 +72,10 @@ spec:
version:
description: Version is the version of the Kubernetes resource.
type: string
evictionStartTime:
description: 'EvictionStartTime represents the current appliedmanifestwork will be evicted after a grace period. An appliedmanifestwork will be evicted from the managed cluster in the following two scenarios: - the manifestwork of the current appliedmanifestwork is missing on the hub, or - the appliedmanifestwork hub hash does not match the current hub hash of the work agent.'
type: string
format: date-time
served: true
storage: true
subresources:
Expand Down
3 changes: 2 additions & 1 deletion pkg/helper/helper_test.go
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/google/go-cmp/cmp"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"

"github.com/openshift/library-go/pkg/operator/events/eventstesting"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down
Expand Up @@ -68,13 +68,11 @@ func (m *ManifestWorkFinalizeController) sync(ctx context.Context, controllerCon

manifestWork, err := m.manifestWorkLister.Get(manifestWorkName)

// Delete appliedmanifestwork if relating manfiestwork is not found or being deleted
// Delete appliedmanifestwork if relating manfiestwork is being deleted
switch {
case errors.IsNotFound(err):
err := m.deleteAppliedManifestWork(ctx, appliedManifestWorkName)
if err != nil {
return err
}
// the appliedmanifestwork will be evicted if the relating manfiestwork is not found
return nil
case err != nil:
return err
case !manifestWork.DeletionTimestamp.IsZero():
Expand Down
Expand Up @@ -185,19 +185,13 @@ func TestSyncManifestWorkController(t *testing.T) {
Name: fmt.Sprintf("%s-work", hubHash),
},
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Errorf("Expect 2 actions on appliedmanifestwork, but have %d", len(actions))
}

spoketesting.AssertAction(t, actions[0], "delete")
},
validateAppliedManifestWorkActions: noAction,
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Suppose nothing done for manifestwork")
}
},
expectedQueueLen: 1,
expectedQueueLen: 0,
},
}

Expand Down
Expand Up @@ -2,17 +2,20 @@ package finalizercontroller

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workinformer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
Expand All @@ -21,134 +24,145 @@ import (
"open-cluster-management.io/work/pkg/helper"
)

const byWorkNameAndAgentID = "UnManagedAppliedManifestWork-byWorkNameAndAgentID"

// UnManagedAppliedWorkController deletes unmanaged applied works.
type UnManagedAppliedWorkController struct {
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
appliedManifestWorkIndexer cache.Indexer
hubHash string
agentID string
type unmanagedAppliedWorkController struct {
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
hubHash string
agentID string
evictionGracePeriod time.Duration
rateLimiter workqueue.RateLimiter
}

// NewUnManagedAppliedWorkController returns a controller to evict the unmanaged appliedmanifestworks.
//
// An appliedmanifestwork will be considered unmanaged in the following scenarios:
// - the manifestwork of the current appliedmanifestwork is missing on the hub, or
// - the appliedmanifestwork hub hash does not match the current hub hash of the work agent.
//
// One unmanaged appliedmanifestwork will be evicted from the managed cluster after a grace period (by
// default, 10 minutes), after one appliedmanifestwork is evicted from the managed cluster, its owned
// resources will also be evicted from the managed cluster with Kubernetes garbage collection.
func NewUnManagedAppliedWorkController(
recorder events.Recorder,
manifestWorkInformer workinformer.ManifestWorkInformer,
manifestWorkLister worklister.ManifestWorkNamespaceLister,
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer,
evictionGracePeriod time.Duration,
hubHash, agentID string,
) factory.Controller {

controller := &UnManagedAppliedWorkController{
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
appliedManifestWorkIndexer: appliedManifestWorkInformer.Informer().GetIndexer(),
hubHash: hubHash,
agentID: agentID,
}

err := appliedManifestWorkInformer.Informer().AddIndexers(cache.Indexers{
byWorkNameAndAgentID: indexByWorkNameAndAgentID,
})
if err != nil {
utilruntime.HandleError(err)
controller := &unmanagedAppliedWorkController{
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
hubHash: hubHash,
agentID: agentID,
evictionGracePeriod: evictionGracePeriod,
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(1*time.Minute, evictionGracePeriod),
}

return factory.New().
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return fmt.Sprintf("%s-%s", hubHash, accessor.GetName())
}, manifestWorkInformer.Informer()).
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return accessor.GetName()
}, helper.AppliedManifestworkHubHashFilter(hubHash), appliedManifestWorkInformer.Informer()).
WithFilteredEventsInformersQueueKeyFunc(
func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return accessor.GetName()
}, helper.AppliedManifestworkAgentIDFilter(agentID), appliedManifestWorkInformer.Informer()).
WithSync(controller.sync).ToController("UnManagedAppliedManifestWork", recorder)
}

func (m *UnManagedAppliedWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
func (m *unmanagedAppliedWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
appliedManifestWorkName := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling ManifestWork %q", appliedManifestWorkName)
klog.V(4).Infof("Reconciling AppliedManifestWork %q", appliedManifestWorkName)

appliedManifestWork, err := m.appliedManifestWorkLister.Get(appliedManifestWorkName)
if errors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
// appliedmanifestwork not found, could have been deleted, do nothing.
return nil
}
if err != nil {
return err
}

// We delete the old AppliedManifestWork only when the related ManifestWork is applied with the new
// AppliedManifestWork as the new owner. This can avoid deleting the old AppliedManifestWork prematurely
// before the new AppliedManifestWork takes the ownership of the applied resources.
manifestWork, err := m.manifestWorkLister.Get(appliedManifestWork.Spec.ManifestWorkName)
_, err = m.manifestWorkLister.Get(appliedManifestWork.Spec.ManifestWorkName)
if errors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
return nil
// evict the current appliedmanifestwork when its relating manifestwork is missing on the hub
return m.evictAppliedManifestWork(ctx, controllerContext, appliedManifestWork)
}
if err != nil {
return err
}
if !meta.IsStatusConditionTrue(manifestWork.Status.Conditions, workapiv1.WorkApplied) {
// the work is not applied, do nothing.
return nil
}

unManagedAppliedWorks, err := m.getUnManagedAppliedManifestWorksByIndex(appliedManifestWork.Spec.ManifestWorkName, appliedManifestWork.Spec.AgentID)
if err != nil {
return err
// manifestwork exists but hub changed
if !strings.HasPrefix(appliedManifestWork.Name, m.hubHash) {
return m.evictAppliedManifestWork(ctx, controllerContext, appliedManifestWork)
}

var errs []error
for _, appliedWork := range unManagedAppliedWorks {
klog.V(2).Infof("Delete appliedWork %s since it is not managed by agent %s anymore", appliedWork.Name, m.agentID)
err := m.appliedManifestWorkClient.Delete(ctx, appliedWork.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, err)
}
}

return utilerrors.NewAggregate(errs)
// stop to evict the current appliedmanifestwork when its relating manifestwork is recreated on the hub
return m.stopToEvictAppliedManifestWork(ctx, appliedManifestWork)
}

// getUnManagedAppliedManifestWorksByIndex finds appliedmanifestwork with the same workname and agent ID but different hubhash.
// These appliedManifestWorks is considered to be not managed by this work agent anymore and should be deleted.
// The reason of marking them as unmanaged is because the only reason under this conditions is work agent is switched to connect
// to a recovered hub or a fresh new hub. Those appliedmanifestwork needs to be deleted to avoid conflict with the newly connected
// hub.
func (m *UnManagedAppliedWorkController) getUnManagedAppliedManifestWorksByIndex(workName, agentID string) ([]*workapiv1.AppliedManifestWork, error) {
index := agentIDWorkNameIndex(workName, agentID)
items, err := m.appliedManifestWorkIndexer.ByIndex(byWorkNameAndAgentID, index)
if err != nil {
return nil, err
func (m *unmanagedAppliedWorkController) evictAppliedManifestWork(ctx context.Context,
controllerContext factory.SyncContext, appliedManifestWork *workapiv1.AppliedManifestWork) error {
now := time.Now()

evictionStartTime := appliedManifestWork.Status.EvictionStartTime
if evictionStartTime == nil {
return m.patchEvictionStartTime(ctx, appliedManifestWork, &metav1.Time{Time: now})
}

ret := make([]*workapiv1.AppliedManifestWork, 0, len(items))
for _, item := range items {
appliedWork := item.(*workapiv1.AppliedManifestWork)
if appliedWork.Spec.HubHash == m.hubHash {
continue
}
ret = append(ret, item.(*workapiv1.AppliedManifestWork))
if now.Before(evictionStartTime.Add(m.evictionGracePeriod)) {
controllerContext.Queue().AddAfter(appliedManifestWork.Name, m.rateLimiter.When(appliedManifestWork.Name))
return nil
}

return ret, nil
klog.V(2).Infof("Delete appliedWork %s by agent %s after eviction grace periodby", appliedManifestWork.Name, m.agentID)
return m.appliedManifestWorkClient.Delete(ctx, appliedManifestWork.Name, metav1.DeleteOptions{})
}

func indexByWorkNameAndAgentID(obj interface{}) ([]string, error) {
appliedWork, ok := obj.(*workapiv1.AppliedManifestWork)
if !ok {
return []string{}, fmt.Errorf("obj is supposed to be a AppliedManifestWork, but is %T", obj)
func (m *unmanagedAppliedWorkController) stopToEvictAppliedManifestWork(
ctx context.Context, appliedManifestWork *workapiv1.AppliedManifestWork) error {
if appliedManifestWork.Status.EvictionStartTime == nil {
return nil
}

return []string{agentIDWorkNameIndex(appliedWork.Spec.ManifestWorkName, appliedWork.Spec.AgentID)}, nil
m.rateLimiter.Forget(appliedManifestWork.Name)
return m.patchEvictionStartTime(ctx, appliedManifestWork, nil)
}

func agentIDWorkNameIndex(workName, agentID string) string {
return fmt.Sprintf("%s/%s", workName, agentID)
func (m *unmanagedAppliedWorkController) patchEvictionStartTime(ctx context.Context,
appliedManifestWork *workapiv1.AppliedManifestWork, evictionStartTime *metav1.Time) error {
oldData, err := json.Marshal(workapiv1.AppliedManifestWork{
Status: workapiv1.AppliedManifestWorkStatus{
EvictionStartTime: appliedManifestWork.Status.EvictionStartTime,
},
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for appliedmanifestwork status %s: %w", appliedManifestWork.Name, err)
}

newData, err := json.Marshal(workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
UID: appliedManifestWork.UID,
ResourceVersion: appliedManifestWork.ResourceVersion,
},
Status: workapiv1.AppliedManifestWorkStatus{
EvictionStartTime: evictionStartTime,
},
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for appliedmanifestwork status %s: %w", appliedManifestWork.Name, err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for cluster %s: %w", appliedManifestWork.Name, err)
}

_, err = m.appliedManifestWorkClient.Patch(ctx, appliedManifestWork.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}

0 comments on commit 07d7ec4

Please sign in to comment.