New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
appliedmanifestwork eviction #190
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,17 +2,20 @@ package finalizercontroller | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
jsonpatch "github.com/evanphx/json-patch" | ||
"github.com/openshift/library-go/pkg/controller/factory" | ||
"github.com/openshift/library-go/pkg/operator/events" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/api/meta" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
utilerrors "k8s.io/apimachinery/pkg/util/errors" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/client-go/tools/cache" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/util/workqueue" | ||
"k8s.io/klog/v2" | ||
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" | ||
workinformer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" | ||
|
@@ -21,134 +24,145 @@ import ( | |
"open-cluster-management.io/work/pkg/helper" | ||
) | ||
|
||
const byWorkNameAndAgentID = "UnManagedAppliedManifestWork-byWorkNameAndAgentID" | ||
|
||
// UnManagedAppliedWorkController deletes unmanaged applied works. | ||
type UnManagedAppliedWorkController struct { | ||
manifestWorkLister worklister.ManifestWorkNamespaceLister | ||
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface | ||
appliedManifestWorkLister worklister.AppliedManifestWorkLister | ||
appliedManifestWorkIndexer cache.Indexer | ||
hubHash string | ||
agentID string | ||
type unmanagedAppliedWorkController struct { | ||
manifestWorkLister worklister.ManifestWorkNamespaceLister | ||
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface | ||
appliedManifestWorkLister worklister.AppliedManifestWorkLister | ||
hubHash string | ||
agentID string | ||
evictionGracePeriod time.Duration | ||
rateLimiter workqueue.RateLimiter | ||
} | ||
|
||
// NewUnManagedAppliedWorkController returns a controller to evict the unmanaged appliedmanifestworks. | ||
// | ||
// An appliedmanifestwork will be considered unmanaged in the following scenarios: | ||
// - the manifestwork of the current appliedmanifestwork is missing on the hub, or | ||
// - the appliedmanifestwork hub hash does not match the current hub hash of the work agent. | ||
// | ||
// One unmanaged appliedmanifestwork will be evicted from the managed cluster after a grace period (by | ||
// default, 10 minutes), after one appliedmanifestwork is evicted from the managed cluster, its owned | ||
// resources will also be evicted from the managed cluster with Kubernetes garbage collection. | ||
func NewUnManagedAppliedWorkController( | ||
recorder events.Recorder, | ||
manifestWorkInformer workinformer.ManifestWorkInformer, | ||
manifestWorkLister worklister.ManifestWorkNamespaceLister, | ||
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface, | ||
appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer, | ||
evictionGracePeriod time.Duration, | ||
hubHash, agentID string, | ||
) factory.Controller { | ||
|
||
controller := &UnManagedAppliedWorkController{ | ||
manifestWorkLister: manifestWorkLister, | ||
appliedManifestWorkClient: appliedManifestWorkClient, | ||
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(), | ||
appliedManifestWorkIndexer: appliedManifestWorkInformer.Informer().GetIndexer(), | ||
hubHash: hubHash, | ||
agentID: agentID, | ||
} | ||
|
||
err := appliedManifestWorkInformer.Informer().AddIndexers(cache.Indexers{ | ||
byWorkNameAndAgentID: indexByWorkNameAndAgentID, | ||
}) | ||
if err != nil { | ||
utilruntime.HandleError(err) | ||
controller := &unmanagedAppliedWorkController{ | ||
manifestWorkLister: manifestWorkLister, | ||
appliedManifestWorkClient: appliedManifestWorkClient, | ||
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(), | ||
hubHash: hubHash, | ||
agentID: agentID, | ||
evictionGracePeriod: evictionGracePeriod, | ||
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(1*time.Minute, evictionGracePeriod), | ||
} | ||
|
||
return factory.New(). | ||
WithInformersQueueKeyFunc(func(obj runtime.Object) string { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we do not add this eventhandler, how could the controller know when a manifestwork is deleted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this controller will not handle the manifestwork deletion, it just need to know the relating manifestwork can be found from hub or not, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added, and I also remove the https://github.com/open-cluster-management-io/work/pull/190/files#diff-7d50afd5fa68dd772f0ace48c6b0b06cb79e670b3604840100f9db9a89d6ed59R74, so that we have a uniform way to handle if the manifestwork is missing on the hub |
||
accessor, _ := meta.Accessor(obj) | ||
return fmt.Sprintf("%s-%s", hubHash, accessor.GetName()) | ||
}, manifestWorkInformer.Informer()). | ||
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string { | ||
accessor, _ := meta.Accessor(obj) | ||
return accessor.GetName() | ||
}, helper.AppliedManifestworkHubHashFilter(hubHash), appliedManifestWorkInformer.Informer()). | ||
WithFilteredEventsInformersQueueKeyFunc( | ||
func(obj runtime.Object) string { | ||
accessor, _ := meta.Accessor(obj) | ||
return accessor.GetName() | ||
}, helper.AppliedManifestworkAgentIDFilter(agentID), appliedManifestWorkInformer.Informer()). | ||
WithSync(controller.sync).ToController("UnManagedAppliedManifestWork", recorder) | ||
} | ||
|
||
func (m *UnManagedAppliedWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error { | ||
func (m *unmanagedAppliedWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error { | ||
appliedManifestWorkName := controllerContext.QueueKey() | ||
klog.V(4).Infof("Reconciling ManifestWork %q", appliedManifestWorkName) | ||
klog.V(4).Infof("Reconciling AppliedManifestWork %q", appliedManifestWorkName) | ||
|
||
appliedManifestWork, err := m.appliedManifestWorkLister.Get(appliedManifestWorkName) | ||
if errors.IsNotFound(err) { | ||
// work not found, could have been deleted, do nothing. | ||
// appliedmanifestwork not found, could have been deleted, do nothing. | ||
return nil | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// We delete the old AppliedManifestWork only when the related ManifestWork is applied with the new | ||
// AppliedManifestWork as the new owner. This can avoid deleting the old AppliedManifestWork prematurely | ||
// before the new AppliedManifestWork takes the ownership of the applied resources. | ||
manifestWork, err := m.manifestWorkLister.Get(appliedManifestWork.Spec.ManifestWorkName) | ||
_, err = m.manifestWorkLister.Get(appliedManifestWork.Spec.ManifestWorkName) | ||
if errors.IsNotFound(err) { | ||
// work not found, could have been deleted, do nothing. | ||
return nil | ||
// evict the current appliedmanifestwork when its relating manifestwork is missing on the hub | ||
return m.evictAppliedManifestWork(ctx, controllerContext, appliedManifestWork) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
if !meta.IsStatusConditionTrue(manifestWork.Status.Conditions, workapiv1.WorkApplied) { | ||
// the work is not applied, do nothing. | ||
return nil | ||
} | ||
|
||
unManagedAppliedWorks, err := m.getUnManagedAppliedManifestWorksByIndex(appliedManifestWork.Spec.ManifestWorkName, appliedManifestWork.Spec.AgentID) | ||
if err != nil { | ||
return err | ||
// manifestwork exists but hub changed | ||
if !strings.HasPrefix(appliedManifestWork.Name, m.hubHash) { | ||
return m.evictAppliedManifestWork(ctx, controllerContext, appliedManifestWork) | ||
} | ||
|
||
var errs []error | ||
for _, appliedWork := range unManagedAppliedWorks { | ||
klog.V(2).Infof("Delete appliedWork %s since it is not managed by agent %s anymore", appliedWork.Name, m.agentID) | ||
err := m.appliedManifestWorkClient.Delete(ctx, appliedWork.Name, metav1.DeleteOptions{}) | ||
if err != nil { | ||
errs = append(errs, err) | ||
} | ||
} | ||
|
||
return utilerrors.NewAggregate(errs) | ||
// stop to evict the current appliedmanifestwork when its relating manifestwork is recreated on the hub | ||
return m.stopToEvictAppliedManifestWork(ctx, appliedManifestWork) | ||
} | ||
|
||
// getUnManagedAppliedManifestWorksByIndex finds appliedmanifestwork with the same workname and agent ID but different hubhash. | ||
// These appliedManifestWorks is considered to be not managed by this work agent anymore and should be deleted. | ||
// The reason of marking them as unmanaged is because the only reason under this conditions is work agent is switched to connect | ||
// to a recovered hub or a fresh new hub. Those appliedmanifestwork needs to be deleted to avoid conflict with the newly connected | ||
// hub. | ||
func (m *UnManagedAppliedWorkController) getUnManagedAppliedManifestWorksByIndex(workName, agentID string) ([]*workapiv1.AppliedManifestWork, error) { | ||
index := agentIDWorkNameIndex(workName, agentID) | ||
items, err := m.appliedManifestWorkIndexer.ByIndex(byWorkNameAndAgentID, index) | ||
if err != nil { | ||
return nil, err | ||
func (m *unmanagedAppliedWorkController) evictAppliedManifestWork(ctx context.Context, | ||
controllerContext factory.SyncContext, appliedManifestWork *workapiv1.AppliedManifestWork) error { | ||
now := time.Now() | ||
|
||
evictionStartTime := appliedManifestWork.Status.EvictionStartTime | ||
if evictionStartTime == nil { | ||
return m.patchEvictionStartTime(ctx, appliedManifestWork, &metav1.Time{Time: now}) | ||
} | ||
|
||
ret := make([]*workapiv1.AppliedManifestWork, 0, len(items)) | ||
for _, item := range items { | ||
appliedWork := item.(*workapiv1.AppliedManifestWork) | ||
if appliedWork.Spec.HubHash == m.hubHash { | ||
continue | ||
} | ||
ret = append(ret, item.(*workapiv1.AppliedManifestWork)) | ||
if now.Before(evictionStartTime.Add(m.evictionGracePeriod)) { | ||
controllerContext.Queue().AddAfter(appliedManifestWork.Name, m.rateLimiter.When(appliedManifestWork.Name)) | ||
return nil | ||
} | ||
|
||
return ret, nil | ||
klog.V(2).Infof("Delete appliedWork %s by agent %s after eviction grace periodby", appliedManifestWork.Name, m.agentID) | ||
return m.appliedManifestWorkClient.Delete(ctx, appliedManifestWork.Name, metav1.DeleteOptions{}) | ||
} | ||
|
||
func indexByWorkNameAndAgentID(obj interface{}) ([]string, error) { | ||
appliedWork, ok := obj.(*workapiv1.AppliedManifestWork) | ||
if !ok { | ||
return []string{}, fmt.Errorf("obj is supposed to be a AppliedManifestWork, but is %T", obj) | ||
func (m *unmanagedAppliedWorkController) stopToEvictAppliedManifestWork( | ||
ctx context.Context, appliedManifestWork *workapiv1.AppliedManifestWork) error { | ||
if appliedManifestWork.Status.EvictionStartTime == nil { | ||
return nil | ||
} | ||
|
||
return []string{agentIDWorkNameIndex(appliedWork.Spec.ManifestWorkName, appliedWork.Spec.AgentID)}, nil | ||
m.rateLimiter.Forget(appliedManifestWork.Name) | ||
return m.patchEvictionStartTime(ctx, appliedManifestWork, nil) | ||
} | ||
|
||
func agentIDWorkNameIndex(workName, agentID string) string { | ||
return fmt.Sprintf("%s/%s", workName, agentID) | ||
func (m *unmanagedAppliedWorkController) patchEvictionStartTime(ctx context.Context, | ||
appliedManifestWork *workapiv1.AppliedManifestWork, evictionStartTime *metav1.Time) error { | ||
oldData, err := json.Marshal(workapiv1.AppliedManifestWork{ | ||
Status: workapiv1.AppliedManifestWorkStatus{ | ||
EvictionStartTime: appliedManifestWork.Status.EvictionStartTime, | ||
}, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to Marshal old data for appliedmanifestwork status %s: %w", appliedManifestWork.Name, err) | ||
} | ||
|
||
newData, err := json.Marshal(workapiv1.AppliedManifestWork{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
UID: appliedManifestWork.UID, | ||
ResourceVersion: appliedManifestWork.ResourceVersion, | ||
}, | ||
Status: workapiv1.AppliedManifestWorkStatus{ | ||
EvictionStartTime: evictionStartTime, | ||
}, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to Marshal new data for appliedmanifestwork status %s: %w", appliedManifestWork.Name, err) | ||
} | ||
|
||
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) | ||
if err != nil { | ||
return fmt.Errorf("failed to create patch for cluster %s: %w", appliedManifestWork.Name, err) | ||
} | ||
|
||
_, err = m.appliedManifestWorkClient.Patch(ctx, appliedManifestWork.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add some doc for this controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added