Skip to content

Commit

Permalink
Merge pull request #3990 from chaosi-zju/dev
Browse files Browse the repository at this point in the history
Response to priority change of ClusterPropagationPolicy
  • Loading branch information
karmada-bot committed Aug 24, 2023
2 parents 5e8a765 + 915c35d commit 720f51a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
48 changes: 47 additions & 1 deletion pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,14 +894,60 @@ func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
}

// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(_, newObj interface{}) {
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) {
key, err := ClusterWideKeyFunc(newObj)
if err != nil {
return
}

klog.V(2).Infof("Update ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)

// Temporary solution of corner case: After the priority(.spec.priority) of
// ClusterPropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3),
// we should try to check if there is a ClusterPropagationPolicy(e.g. with priority 4)
// could preempt the targeted resources.
//
// Recognized limitations of the temporary solution are:
// - Too much logical processed in an event handler function will slow down
// the overall reconcile speed.
// - If there is an error raised during the process, the event will be lost
// and no second chance to retry.
//
// The idea of the long-term solution, perhaps ClusterPropagationPolicy could have
// a status, in that case we can record the observed priority(.status.observedPriority)
// which can be used to detect priority changes during reconcile logic.
if features.FeatureGate.Enabled(features.PolicyPreemption) {
var unstructuredOldObj *unstructured.Unstructured
var unstructuredNewObj *unstructured.Unstructured

unstructuredOldObj, err = helper.ToUnstructured(oldObj)
if err != nil {
klog.Errorf("Failed to transform oldObj, error: %v", err)
return
}
unstructuredNewObj, err = helper.ToUnstructured(newObj)
if err != nil {
klog.Errorf("Failed to transform newObj, error: %v", err)
return
}

var oldPolicy policyv1alpha1.ClusterPropagationPolicy
var newPolicy policyv1alpha1.ClusterPropagationPolicy

if err = helper.ConvertToTypedObject(unstructuredOldObj, &oldPolicy); err != nil {
klog.Errorf("Failed to convert typed ClusterPropagationPolicy(%s/%s): %v", unstructuredOldObj.GetNamespace(), unstructuredOldObj.GetName(), err)
return
}
if err = helper.ConvertToTypedObject(unstructuredNewObj, &newPolicy); err != nil {
klog.Errorf("Failed to convert typed ClusterPropagationPolicy(%s/%s): %v", newPolicy.GetNamespace(), newPolicy.GetName(), err)
return
}

if newPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
d.HandleDeprioritizedClusterPropagationPolicy(oldPolicy, newPolicy)
}
}
}

// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
Expand Down
46 changes: 46 additions & 0 deletions pkg/detector/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,49 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
}
}
}

// HandleDeprioritizedClusterPropagationPolicy responses to priority change of a ClusterPropagationPolicy,
// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will
// check if there is another ClusterPropagationPolicy could preempt the targeted resource,
// and put the ClusterPropagationPolicy in the queue to trigger preemption.
func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) {
klog.Infof("ClusterPropagationPolicy(%s/%s) priority changed from %d to %d",
newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)

policies, err := d.clusterPropagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list ClusterPropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// higher priority ClusterPropagationPolicy be process first to avoid possible
// multiple preemption.

for i := range policies {
var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
klog.Errorf("Failed to convert typed ClusterPropagationPolicy: %v", err)
continue
}
// Re-queue the polies that enables preemption and with the priority
// in range (new priority, old priority).
// For the polices with higher priority than old priority, it can
// perform preempt automatically and don't need to re-queue here.
// For the polices with lower priority than new priority, it can't
// perform preempt as insufficient priority.
if potentialPolicy.Spec.Priority != nil &&
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
}
klog.Infof("Enqueuing ClusterPropagationPolicy(%s/%s) in case of ClusterPropagationPolicy(%s/%s) priority changes",
potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.clusterPolicyReconcileWorker.Add(potentialKey)
}
}
}

0 comments on commit 720f51a

Please sign in to comment.