Skip to content

Commit

Permalink
appliedmanifestwork eviction
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Mar 3, 2023
1 parent 21fb8d7 commit ea5e66c
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 254 deletions.
4 changes: 4 additions & 0 deletions deploy/spoke/appliedmanifestworks.crd.yaml
Expand Up @@ -72,6 +72,10 @@ spec:
version:
description: Version is the version of the Kubernetes resource.
type: string
evictionStartTime:
description: 'EvictionStartTime represents the current appliedmanifestwork will be evicted after a grace period. An appliedmanifestwork will be evicted from the managed cluster in the following two scenarios: - the manifestwork of the current appliedmanifestwork is missing on the hub, or - the appliedmanifestwork hub hash does not match the current hub hash of the work agent.'
type: string
format: date-time
served: true
storage: true
subresources:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -22,7 +22,7 @@ require (
k8s.io/component-base v0.26.1
k8s.io/klog/v2 v2.80.1
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448
open-cluster-management.io/api v0.10.1-0.20230227155011-2c63272fa800
open-cluster-management.io/api v0.10.1-0.20230228092946-080d7b65f4f3
sigs.k8s.io/controller-runtime v0.14.4
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -1024,8 +1024,8 @@ modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
open-cluster-management.io/api v0.10.1-0.20230227155011-2c63272fa800 h1:/f9jJbabrWG/PCS42Vcxh/Gl0qxNCaGsZ59fQB1jLhc=
open-cluster-management.io/api v0.10.1-0.20230227155011-2c63272fa800/go.mod h1:TjWobG3dTZJf/Ye04358/F/381RjE/+HXVDGMnZBpjc=
open-cluster-management.io/api v0.10.1-0.20230228092946-080d7b65f4f3 h1:AJaXjQIIUYCbv10bRnurz4d2UZtN1PfpbWD80KtiZGU=
open-cluster-management.io/api v0.10.1-0.20230228092946-080d7b65f4f3/go.mod h1:TjWobG3dTZJf/Ye04358/F/381RjE/+HXVDGMnZBpjc=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
87 changes: 86 additions & 1 deletion pkg/helper/helper_test.go
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/google/go-cmp/cmp"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"

"github.com/openshift/library-go/pkg/operator/events/eventstesting"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -810,3 +811,87 @@ func TestBuildResourceMeta(t *testing.T) {
})
}
}

func TestAppliedManifestworkEvictionFilter(t *testing.T) {
cases := []struct {
name string
appliedManifestWork runtime.Object
hubHash string
agentID string
expected bool
}{
{
name: "nil appliedmanifestwork",
expected: false,
},
{
name: "hub and agent are unchanged",
appliedManifestWork: &workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: workapiv1.AppliedManifestWorkSpec{
HubHash: "h1234",
AgentID: "a1234",
},
},
hubHash: "h1234",
agentID: "a1234",
expected: true,
},
{
name: "hub is unchanged, agent is changed",
appliedManifestWork: &workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: workapiv1.AppliedManifestWorkSpec{
HubHash: "h1234",
AgentID: "a1234",
},
},
hubHash: "h1234",
agentID: "a5678",
expected: true,
},
{
name: "hub is changed, agent is unchanged",
appliedManifestWork: &workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: workapiv1.AppliedManifestWorkSpec{
HubHash: "h1234",
AgentID: "a1234",
},
},
hubHash: "h5678",
agentID: "a1234",
expected: true,
},
{
name: "hub is changed, agent is changed",
appliedManifestWork: &workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: workapiv1.AppliedManifestWorkSpec{
HubHash: "h1234",
AgentID: "a1234",
},
},
hubHash: "h5678",
agentID: "a5678",
expected: false,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if AppliedManifestWorkEvictionFilter(c.hubHash, c.agentID)(c.appliedManifestWork) != c.expected {
t.Errorf("Expected %v, but opposite", c.expected)
}

})
}
}
16 changes: 16 additions & 0 deletions pkg/helper/helpers.go
Expand Up @@ -378,6 +378,22 @@ func AppliedManifestworkHubHashFilter(hubHash string) factory.EventFilterFunc {
}
}

// AppliedManifestWorkEvictionFilter filter the appliedmanifestwork belonging to this hub or this work agent
func AppliedManifestWorkEvictionFilter(hubHash, agentID string) factory.EventFilterFunc {
return func(obj interface{}) bool {
appliedWork, ok := obj.(*workapiv1.AppliedManifestWork)
if !ok {
return false
}

if appliedWork.Spec.HubHash == hubHash {
return true
}

return appliedWork.Spec.AgentID == agentID
}
}

// HubHash returns a hash of hubserver
// NOTE: the length of hash string is 64, meaning the length of manifestwork name should be less than 189
func HubHash(hubServer string) string {
Expand Down
Expand Up @@ -2,17 +2,16 @@ package finalizercontroller

import (
"context"
"fmt"
"strings"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workinformer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
Expand All @@ -21,134 +20,105 @@ import (
"open-cluster-management.io/work/pkg/helper"
)

const byWorkNameAndAgentID = "UnManagedAppliedManifestWork-byWorkNameAndAgentID"

// UnManagedAppliedWorkController deletes unmanaged applied works.
type UnManagedAppliedWorkController struct {
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
appliedManifestWorkIndexer cache.Indexer
hubHash string
agentID string
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
hubHash string
agentID string
evictionGracePeriod time.Duration
rateLimiter workqueue.RateLimiter
}

func NewUnManagedAppliedWorkController(
recorder events.Recorder,
manifestWorkInformer workinformer.ManifestWorkInformer,
manifestWorkLister worklister.ManifestWorkNamespaceLister,
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer,
evictionGracePeriod time.Duration,
hubHash, agentID string,
) factory.Controller {

controller := &UnManagedAppliedWorkController{
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
appliedManifestWorkIndexer: appliedManifestWorkInformer.Informer().GetIndexer(),
hubHash: hubHash,
agentID: agentID,
}

err := appliedManifestWorkInformer.Informer().AddIndexers(cache.Indexers{
byWorkNameAndAgentID: indexByWorkNameAndAgentID,
})
if err != nil {
utilruntime.HandleError(err)
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
hubHash: hubHash,
agentID: agentID,
evictionGracePeriod: evictionGracePeriod,
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(1*time.Minute, evictionGracePeriod),
}

return factory.New().
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return fmt.Sprintf("%s-%s", hubHash, accessor.GetName())
}, manifestWorkInformer.Informer()).
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return accessor.GetName()
}, helper.AppliedManifestworkHubHashFilter(hubHash), appliedManifestWorkInformer.Informer()).
WithFilteredEventsInformersQueueKeyFunc(
func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return accessor.GetName()
}, helper.AppliedManifestWorkEvictionFilter(hubHash, agentID), appliedManifestWorkInformer.Informer()).
WithSync(controller.sync).ToController("UnManagedAppliedManifestWork", recorder)
}

func (m *UnManagedAppliedWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
appliedManifestWorkName := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling ManifestWork %q", appliedManifestWorkName)
klog.V(4).Infof("Reconciling AppliedManifestWork %q", appliedManifestWorkName)

appliedManifestWork, err := m.appliedManifestWorkLister.Get(appliedManifestWorkName)
if errors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
// appliedmanifestwork not found, could have been deleted, do nothing.
return nil
}
if err != nil {
return err
}

// We delete the old AppliedManifestWork only when the related ManifestWork is applied with the new
// AppliedManifestWork as the new owner. This can avoid deleting the old AppliedManifestWork prematurely
// before the new AppliedManifestWork takes the ownership of the applied resources.
manifestWork, err := m.manifestWorkLister.Get(appliedManifestWork.Spec.ManifestWorkName)
_, err = m.manifestWorkLister.Get(appliedManifestWork.Spec.ManifestWorkName)
if errors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
return nil
// evict the current appliedmanifestwork when its relating manifestwork is missing on the hub
return m.evictAppliedManifestWork(ctx, controllerContext, appliedManifestWork)
}
if err != nil {
return err
}
if !meta.IsStatusConditionTrue(manifestWork.Status.Conditions, workapiv1.WorkApplied) {
// the work is not applied, do nothing.
return nil
}

unManagedAppliedWorks, err := m.getUnManagedAppliedManifestWorksByIndex(appliedManifestWork.Spec.ManifestWorkName, appliedManifestWork.Spec.AgentID)
if err != nil {
return err
// manifestwork exists but hub changed
if !strings.HasPrefix(appliedManifestWork.Name, m.hubHash) {
return m.evictAppliedManifestWork(ctx, controllerContext, appliedManifestWork)
}

var errs []error
for _, appliedWork := range unManagedAppliedWorks {
klog.V(2).Infof("Delete appliedWork %s since it is not managed by agent %s anymore", appliedWork.Name, m.agentID)
err := m.appliedManifestWorkClient.Delete(ctx, appliedWork.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, err)
}
}

return utilerrors.NewAggregate(errs)
// stop to evict the current appliedmanifestwork when its relating manifestwork is recreated on the hub
return m.stopToEvictAppliedManifestWork(ctx, controllerContext, appliedManifestWork)
}

// getUnManagedAppliedManifestWorksByIndex finds appliedmanifestwork with the same workname and agent ID but different hubhash.
// These appliedManifestWorks is considered to be not managed by this work agent anymore and should be deleted.
// The reason of marking them as unmanaged is because the only reason under this conditions is work agent is switched to connect
// to a recovered hub or a fresh new hub. Those appliedmanifestwork needs to be deleted to avoid conflict with the newly connected
// hub.
func (m *UnManagedAppliedWorkController) getUnManagedAppliedManifestWorksByIndex(workName, agentID string) ([]*workapiv1.AppliedManifestWork, error) {
index := agentIDWorkNameIndex(workName, agentID)
items, err := m.appliedManifestWorkIndexer.ByIndex(byWorkNameAndAgentID, index)
if err != nil {
return nil, err
func (m *UnManagedAppliedWorkController) evictAppliedManifestWork(ctx context.Context,
controllerContext factory.SyncContext, appliedManifestWork *workapiv1.AppliedManifestWork) error {
now := time.Now()

evictionStartTime := appliedManifestWork.Status.EvictionStartTime
if evictionStartTime == nil {
copied := appliedManifestWork.DeepCopy()
copied.Status.EvictionStartTime = &metav1.Time{Time: now}
_, err := m.appliedManifestWorkClient.UpdateStatus(ctx, copied, metav1.UpdateOptions{})
return err
}

ret := make([]*workapiv1.AppliedManifestWork, 0, len(items))
for _, item := range items {
appliedWork := item.(*workapiv1.AppliedManifestWork)
if appliedWork.Spec.HubHash == m.hubHash {
continue
}
ret = append(ret, item.(*workapiv1.AppliedManifestWork))
if now.Before(evictionStartTime.Add(m.evictionGracePeriod)) {
controllerContext.Queue().AddAfter(appliedManifestWork.Name, m.rateLimiter.When(appliedManifestWork.Name))
return nil
}

return ret, nil
klog.V(2).Infof("Delete appliedWork %s by agent %s after eviction grace periodby", appliedManifestWork.Name, m.agentID)
return m.appliedManifestWorkClient.Delete(ctx, appliedManifestWork.Name, metav1.DeleteOptions{})
}

func indexByWorkNameAndAgentID(obj interface{}) ([]string, error) {
appliedWork, ok := obj.(*workapiv1.AppliedManifestWork)
if !ok {
return []string{}, fmt.Errorf("obj is supposed to be a AppliedManifestWork, but is %T", obj)
func (m *UnManagedAppliedWorkController) stopToEvictAppliedManifestWork(ctx context.Context,
controllerContext factory.SyncContext, appliedManifestWork *workapiv1.AppliedManifestWork) error {
if appliedManifestWork.Status.EvictionStartTime == nil {
return nil
}

return []string{agentIDWorkNameIndex(appliedWork.Spec.ManifestWorkName, appliedWork.Spec.AgentID)}, nil
}

func agentIDWorkNameIndex(workName, agentID string) string {
return fmt.Sprintf("%s/%s", workName, agentID)
m.rateLimiter.Forget(appliedManifestWork.Name)
copied := appliedManifestWork.DeepCopy()
copied.Status.EvictionStartTime = nil
_, err := m.appliedManifestWorkClient.UpdateStatus(ctx, copied, metav1.UpdateOptions{})
return err
}

0 comments on commit ea5e66c

Please sign in to comment.