Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make changes to binding-controller to adopt graceful eviction #2339

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,7 @@ func (c *ResourceBindingController) removeFinalizer(rb *workv1alpha2.ResourceBin

// syncBinding will sync resourceBinding to Works.
func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBinding) (controllerruntime.Result, error) {
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters, binding.Spec.RequiredBy)
works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, clusterNames, apiextensionsv1.NamespaceScoped)
if err != nil {
klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
if err := c.removeOrphanWorks(binding); err != nil {
return controllerruntime.Result{Requeue: true}, err
}

Expand Down Expand Up @@ -164,6 +151,26 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi
return controllerruntime.Result{}, nil
}

func (c *ResourceBindingController) removeOrphanWorks(binding *workv1alpha2.ResourceBinding) error {
works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, helper.ObtainBindingSpecExistingClusters(binding.Spec))
if err != nil {
klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

return nil
}

// updateResourceStatus will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
func (c *ResourceBindingController) updateResourceStatus(binding *workv1alpha2.ResourceBinding) error {
Expand Down
31 changes: 19 additions & 12 deletions pkg/controllers/binding/cluster_resource_binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,7 @@ func (c *ClusterResourceBindingController) removeFinalizer(crb *workv1alpha2.Clu

// syncBinding will sync clusterResourceBinding to Works.
func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.ClusterResourceBinding) (controllerruntime.Result, error) {
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters, binding.Spec.RequiredBy)
works, err := helper.FindOrphanWorks(c.Client, "", binding.Name, clusterNames, apiextensionsv1.ClusterScoped)
if err != nil {
klog.Errorf("Failed to find orphan works by ClusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by clusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
if err := c.removeOrphanWorks(binding); err != nil {
return controllerruntime.Result{Requeue: true}, err
}

Expand Down Expand Up @@ -145,6 +134,24 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu
return controllerruntime.Result{}, nil
}

func (c *ClusterResourceBindingController) removeOrphanWorks(binding *workv1alpha2.ClusterResourceBinding) error {
works, err := helper.FindOrphanWorks(c.Client, "", binding.Name, helper.ObtainBindingSpecExistingClusters(binding.Spec))
if err != nil {
klog.Errorf("Failed to find orphan works by ClusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by clusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

return nil
}

// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error {
workFn := handler.MapFunc(
Expand Down
37 changes: 15 additions & 22 deletions pkg/util/helper/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -72,40 +71,34 @@ func HasScheduledReplica(scheduleResult []workv1alpha2.TargetCluster) bool {
return false
}

// GetBindingClusterNames will get clusterName list from bind clusters field and requiredBy field.
func GetBindingClusterNames(targetClusters []workv1alpha2.TargetCluster, bindingSnapshot []workv1alpha2.BindingSnapshot) []string {
clusterNames := util.ConvertToClusterNames(targetClusters)
for _, binding := range bindingSnapshot {
// ObtainBindingSpecExistingClusters will obtain the cluster slice existing in the binding's spec field.
func ObtainBindingSpecExistingClusters(bindingSpec workv1alpha2.ResourceBindingSpec) sets.String {
clusterNames := util.ConvertToClusterNames(bindingSpec.Clusters)
for _, binding := range bindingSpec.RequiredBy {
for _, targetCluster := range binding.Clusters {
clusterNames.Insert(targetCluster.Name)
}
}

return clusterNames.List()
for _, task := range bindingSpec.GracefulEvictionTasks {
clusterNames.Insert(task.FromCluster)
}

return clusterNames
}

// FindOrphanWorks retrieves all works that labeled with current binding(ResourceBinding or ClusterResourceBinding) objects,
// then pick the works that not meet current binding declaration.
func FindOrphanWorks(c client.Client, bindingNamespace, bindingName string, clusterNames []string, scope apiextensionsv1.ResourceScope) ([]workv1alpha1.Work, error) {
func FindOrphanWorks(c client.Client, bindingNamespace, bindingName string, expectClusters sets.String) ([]workv1alpha1.Work, error) {
var needJudgeWorks []workv1alpha1.Work
if scope == apiextensionsv1.NamespaceScoped {
workList, err := GetWorksByBindingNamespaceName(c, bindingNamespace, bindingName)
if err != nil {
klog.Errorf("Failed to get works by ResourceBinding(%s/%s): %v", bindingNamespace, bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)
} else {
workList, err := GetWorksByBindingNamespaceName(c, "", bindingName)
if err != nil {
klog.Errorf("Failed to get works by ClusterResourceBinding(%s): %v", bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)
workList, err := GetWorksByBindingNamespaceName(c, bindingNamespace, bindingName)
if err != nil {
klog.Errorf("Failed to get works by binding object (%s/%s): %v", bindingNamespace, bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)

var orphanWorks []workv1alpha1.Work
expectClusters := sets.NewString(clusterNames...)
for _, work := range needJudgeWorks {
workTargetCluster, err := names.GetClusterName(work.GetNamespace())
if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions pkg/util/helper/workstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func AggregateClusterResourceBindingWorkStatus(c client.Client, binding *workv1a
}

func generateFullyAppliedCondition(spec workv1alpha2.ResourceBindingSpec, aggregatedStatuses []workv1alpha2.AggregatedStatusItem) metav1.Condition {
clusterNames := GetBindingClusterNames(spec.Clusters, spec.RequiredBy)
clusterNames := ObtainBindingSpecExistingClusters(spec)
if worksFullyApplied(aggregatedStatuses, clusterNames) {
return util.NewCondition(workv1alpha2.FullyApplied, FullyAppliedSuccessReason, FullyAppliedSuccessMessage, metav1.ConditionTrue)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func equalIdentifier(targetIdentifier *workv1alpha1.ResourceIdentifier, ordinal
}

// worksFullyApplied checks if all works are applied according the scheduled result and collected status.
func worksFullyApplied(aggregatedStatuses []workv1alpha2.AggregatedStatusItem, targetClusters []string) bool {
func worksFullyApplied(aggregatedStatuses []workv1alpha2.AggregatedStatusItem, targetClusters sets.String) bool {
// short path: not scheduled
if len(targetClusters) == 0 {
return false
Expand All @@ -246,17 +246,12 @@ func worksFullyApplied(aggregatedStatuses []workv1alpha2.AggregatedStatusItem, t
return false
}

targetClusterSet := sets.String{}
for i := range targetClusters {
targetClusterSet.Insert(targetClusters[i])
}

for _, aggregatedSatusItem := range aggregatedStatuses {
if !aggregatedSatusItem.Applied {
for _, aggregatedStatusItem := range aggregatedStatuses {
if !aggregatedStatusItem.Applied {
return false
}

if !targetClusterSet.Has(aggregatedSatusItem.ClusterName) {
if !targetClusters.Has(aggregatedStatusItem.ClusterName) {
return false
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/util/helper/workstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package helper
import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"

workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)

func TestWorksFullyApplied(t *testing.T) {
type args struct {
aggregatedStatuses []workv1alpha2.AggregatedStatusItem
targetClusters []string
targetClusters sets.String
}
tests := []struct {
name string
Expand All @@ -33,7 +35,7 @@ func TestWorksFullyApplied(t *testing.T) {
name: "no aggregatedStatuses",
args: args{
aggregatedStatuses: nil,
targetClusters: []string{"member1"},
targetClusters: sets.NewString("member1"),
},
want: false,
},
Expand All @@ -46,7 +48,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: true,
},
},
targetClusters: []string{"member1", "member2"},
targetClusters: sets.NewString("member1", "member2"),
},
want: false,
},
Expand All @@ -63,7 +65,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: true,
},
},
targetClusters: []string{"member1", "member2"},
targetClusters: sets.NewString("member1", "member2"),
},
want: true,
},
Expand All @@ -80,7 +82,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: false,
},
},
targetClusters: []string{"member1", "member2"},
targetClusters: sets.NewString("member1", "member2"),
},
want: false,
},
Expand All @@ -93,7 +95,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: true,
},
},
targetClusters: []string{"member2"},
targetClusters: sets.NewString("member2"),
},
want: false,
},
Expand Down