Skip to content

Commit

Permalink
make changes to binding-controller to adopt graceful eviction
Browse files Browse the repository at this point in the history
Signed-off-by: changzhen <changzhen5@huawei.com>
  • Loading branch information
XiShanYongYe-Chang committed Aug 5, 2022
1 parent 03eb521 commit b6c7320
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 64 deletions.
35 changes: 21 additions & 14 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,7 @@ func (c *ResourceBindingController) removeFinalizer(rb *workv1alpha2.ResourceBin

// syncBinding will sync resourceBinding to Works.
func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBinding) (controllerruntime.Result, error) {
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters, binding.Spec.RequiredBy)
works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, clusterNames, apiextensionsv1.NamespaceScoped)
if err != nil {
klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
if err := c.removeOrphanWorks(binding); err != nil {
return controllerruntime.Result{Requeue: true}, err
}

Expand Down Expand Up @@ -164,6 +151,26 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi
return controllerruntime.Result{}, nil
}

func (c *ResourceBindingController) removeOrphanWorks(binding *workv1alpha2.ResourceBinding) error {
works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, helper.ObtainBindingSpecExistingClusters(binding.Spec))
if err != nil {
klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

return nil
}

// updateResourceStatus will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
func (c *ResourceBindingController) updateResourceStatus(binding *workv1alpha2.ResourceBinding) error {
Expand Down
31 changes: 19 additions & 12 deletions pkg/controllers/binding/cluster_resource_binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,7 @@ func (c *ClusterResourceBindingController) removeFinalizer(crb *workv1alpha2.Clu

// syncBinding will sync clusterResourceBinding to Works.
func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.ClusterResourceBinding) (controllerruntime.Result, error) {
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters, binding.Spec.RequiredBy)
works, err := helper.FindOrphanWorks(c.Client, "", binding.Name, clusterNames, apiextensionsv1.ClusterScoped)
if err != nil {
klog.Errorf("Failed to find orphan works by ClusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by clusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
if err := c.removeOrphanWorks(binding); err != nil {
return controllerruntime.Result{Requeue: true}, err
}

Expand Down Expand Up @@ -145,6 +134,24 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu
return controllerruntime.Result{}, nil
}

func (c *ClusterResourceBindingController) removeOrphanWorks(binding *workv1alpha2.ClusterResourceBinding) error {
works, err := helper.FindOrphanWorks(c.Client, "", binding.Name, helper.ObtainBindingSpecExistingClusters(binding.Spec))
if err != nil {
klog.Errorf("Failed to find orphan works by ClusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil {
klog.Errorf("Failed to remove orphan works by clusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
c.EventRecorder.Event(binding, corev1.EventTypeWarning, workv1alpha2.EventReasonCleanupWorkFailed, err.Error())
return err
}

return nil
}

// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error {
workFn := handler.MapFunc(
Expand Down
37 changes: 15 additions & 22 deletions pkg/util/helper/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -72,40 +71,34 @@ func HasScheduledReplica(scheduleResult []workv1alpha2.TargetCluster) bool {
return false
}

// GetBindingClusterNames will get clusterName list from bind clusters field and requiredBy field.
func GetBindingClusterNames(targetClusters []workv1alpha2.TargetCluster, bindingSnapshot []workv1alpha2.BindingSnapshot) []string {
clusterNames := util.ConvertToClusterNames(targetClusters)
for _, binding := range bindingSnapshot {
// ObtainBindingSpecExistingClusters will obtain the cluster slice existing in the binding's spec field.
func ObtainBindingSpecExistingClusters(bindingSpec workv1alpha2.ResourceBindingSpec) sets.String {
clusterNames := util.ConvertToClusterNames(bindingSpec.Clusters)
for _, binding := range bindingSpec.RequiredBy {
for _, targetCluster := range binding.Clusters {
clusterNames.Insert(targetCluster.Name)
}
}

return clusterNames.List()
for _, task := range bindingSpec.GracefulEvictionTasks {
clusterNames.Insert(task.FromCluster)
}

return clusterNames
}

// FindOrphanWorks retrieves all works that labeled with current binding(ResourceBinding or ClusterResourceBinding) objects,
// then pick the works that not meet current binding declaration.
func FindOrphanWorks(c client.Client, bindingNamespace, bindingName string, clusterNames []string, scope apiextensionsv1.ResourceScope) ([]workv1alpha1.Work, error) {
func FindOrphanWorks(c client.Client, bindingNamespace, bindingName string, expectClusters sets.String) ([]workv1alpha1.Work, error) {
var needJudgeWorks []workv1alpha1.Work
if scope == apiextensionsv1.NamespaceScoped {
workList, err := GetWorksByBindingNamespaceName(c, bindingNamespace, bindingName)
if err != nil {
klog.Errorf("Failed to get works by ResourceBinding(%s/%s): %v", bindingNamespace, bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)
} else {
workList, err := GetWorksByBindingNamespaceName(c, "", bindingName)
if err != nil {
klog.Errorf("Failed to get works by ClusterResourceBinding(%s): %v", bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)
workList, err := GetWorksByBindingNamespaceName(c, bindingNamespace, bindingName)
if err != nil {
klog.Errorf("Failed to get works by binding object (%s/%s): %v", bindingNamespace, bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)

var orphanWorks []workv1alpha1.Work
expectClusters := sets.NewString(clusterNames...)
for _, work := range needJudgeWorks {
workTargetCluster, err := names.GetClusterName(work.GetNamespace())
if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions pkg/util/helper/workstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func AggregateClusterResourceBindingWorkStatus(c client.Client, binding *workv1a
}

func generateFullyAppliedCondition(spec workv1alpha2.ResourceBindingSpec, aggregatedStatuses []workv1alpha2.AggregatedStatusItem) metav1.Condition {
clusterNames := GetBindingClusterNames(spec.Clusters, spec.RequiredBy)
clusterNames := ObtainBindingSpecExistingClusters(spec)
if worksFullyApplied(aggregatedStatuses, clusterNames) {
return util.NewCondition(workv1alpha2.FullyApplied, FullyAppliedSuccessReason, FullyAppliedSuccessMessage, metav1.ConditionTrue)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func equalIdentifier(targetIdentifier *workv1alpha1.ResourceIdentifier, ordinal
}

// worksFullyApplied checks if all works are applied according the scheduled result and collected status.
func worksFullyApplied(aggregatedStatuses []workv1alpha2.AggregatedStatusItem, targetClusters []string) bool {
func worksFullyApplied(aggregatedStatuses []workv1alpha2.AggregatedStatusItem, targetClusters sets.String) bool {
// short path: not scheduled
if len(targetClusters) == 0 {
return false
Expand All @@ -246,17 +246,12 @@ func worksFullyApplied(aggregatedStatuses []workv1alpha2.AggregatedStatusItem, t
return false
}

targetClusterSet := sets.String{}
for i := range targetClusters {
targetClusterSet.Insert(targetClusters[i])
}

for _, aggregatedSatusItem := range aggregatedStatuses {
if !aggregatedSatusItem.Applied {
for _, aggregatedStatusItem := range aggregatedStatuses {
if !aggregatedStatusItem.Applied {
return false
}

if !targetClusterSet.Has(aggregatedSatusItem.ClusterName) {
if !targetClusters.Has(aggregatedStatusItem.ClusterName) {
return false
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/util/helper/workstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package helper
import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"

workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)

func TestWorksFullyApplied(t *testing.T) {
type args struct {
aggregatedStatuses []workv1alpha2.AggregatedStatusItem
targetClusters []string
targetClusters sets.String
}
tests := []struct {
name string
Expand All @@ -33,7 +35,7 @@ func TestWorksFullyApplied(t *testing.T) {
name: "no aggregatedStatuses",
args: args{
aggregatedStatuses: nil,
targetClusters: []string{"member1"},
targetClusters: sets.NewString("member1"),
},
want: false,
},
Expand All @@ -46,7 +48,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: true,
},
},
targetClusters: []string{"member1", "member2"},
targetClusters: sets.NewString("member1", "member2"),
},
want: false,
},
Expand All @@ -63,7 +65,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: true,
},
},
targetClusters: []string{"member1", "member2"},
targetClusters: sets.NewString("member1", "member2"),
},
want: true,
},
Expand All @@ -80,7 +82,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: false,
},
},
targetClusters: []string{"member1", "member2"},
targetClusters: sets.NewString("member1", "member2"),
},
want: false,
},
Expand All @@ -93,7 +95,7 @@ func TestWorksFullyApplied(t *testing.T) {
Applied: true,
},
},
targetClusters: []string{"member2"},
targetClusters: sets.NewString("member2"),
},
want: false,
},
Expand Down

0 comments on commit b6c7320

Please sign in to comment.