Skip to content

Commit

Permalink
Merge pull request #3381 from XiShanYongYe-Chang/e2e-case-clusteraffi…
Browse files Browse the repository at this point in the history
…nities

flake: [ClusterAffinities] propagation testing
  • Loading branch information
karmada-bot committed Apr 27, 2023
2 parents 90829e8 + 467adcf commit ec7b3b1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 44 deletions.
126 changes: 82 additions & 44 deletions pkg/scheduler/event_handler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package scheduler

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -186,12 +189,60 @@ func (s *Scheduler) updateCluster(oldObj, newObj interface{}) {
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
fallthrough
case oldCluster.Generation != newCluster.Generation:
s.enqueueAffectedBindings(oldCluster, newCluster)
// To distinguish the obd and new cluster objects, we need to add the entire object
// to the worker. Therefore, call Add func instead of Enqueue func.
s.clusterReconcileWorker.Add(oldCluster)
s.clusterReconcileWorker.Add(newCluster)
}
}

func (s *Scheduler) deleteCluster(obj interface{}) {
var cluster *clusterv1alpha1.Cluster
switch t := obj.(type) {
case *clusterv1alpha1.Cluster:
cluster = t
case cache.DeletedFinalStateUnknown:
var ok bool
cluster, ok = t.Obj.(*clusterv1alpha1.Cluster)
if !ok {
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t)
return
}

klog.V(3).Infof("Delete event for cluster %s", cluster.Name)

if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.Add(cluster.Name)
}
}

func schedulerNameFilter(schedulerNameFromOptions, schedulerName string) bool {
if schedulerName == "" {
schedulerName = DefaultScheduler
}

return schedulerNameFromOptions == schedulerName
}

func (s *Scheduler) reconcileCluster(key util.QueueKey) error {
cluster, ok := key.(*clusterv1alpha1.Cluster)
if !ok {
return fmt.Errorf("invalid cluster key: %s", key)
}
return utilerrors.NewAggregate([]error{
s.enqueueAffectedBindings(cluster),
s.enqueueAffectedCRBs(cluster)},
)
}

// enqueueAffectedBinding find all RBs/CRBs related to the cluster and reschedule them
func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alpha1.Cluster) {
// enqueueAffectedBindings find all RBs related to the cluster and reschedule them
func (s *Scheduler) enqueueAffectedBindings(cluster *clusterv1alpha1.Cluster) error {
klog.V(4).Infof("Enqueue affected ResourceBinding with cluster %s", cluster.Name)

bindings, _ := s.bindingLister.List(labels.Everything())
for _, binding := range bindings {
placementPtr := binding.Spec.Placement
Expand All @@ -202,6 +253,14 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp

var affinity *policyv1alpha1.ClusterAffinity
if placementPtr.ClusterAffinities != nil {
if binding.Status.SchedulerObservedGeneration != binding.Generation {
// Hit here means the binding maybe still in the queue waiting
// for scheduling or its status has not been synced to the
// cache. Just enqueue the binding to avoid missing the cluster
// update event.
s.onResourceBindingRequeue(binding, metrics.ClusterChanged)
continue
}
affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName)
affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity
} else {
Expand All @@ -212,15 +271,19 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp
case affinity == nil:
// If no clusters specified, add it to the queue
fallthrough
case util.ClusterMatches(newCluster, *affinity):
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
fallthrough
case util.ClusterMatches(oldCluster, *affinity):
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
case util.ClusterMatches(cluster, *affinity):
// If the cluster manifest match the affinity, add it to the queue, trigger rescheduling
s.onResourceBindingRequeue(binding, metrics.ClusterChanged)
}
}

return nil
}

// enqueueAffectedCRBs find all CRBs related to the cluster and reschedule them
func (s *Scheduler) enqueueAffectedCRBs(cluster *clusterv1alpha1.Cluster) error {
klog.V(4).Infof("Enqueue affected ClusterResourceBinding with cluster %s", cluster.Name)

clusterBindings, _ := s.clusterBindingLister.List(labels.Everything())
for _, binding := range clusterBindings {
placementPtr := binding.Spec.Placement
Expand All @@ -231,6 +294,14 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp

var affinity *policyv1alpha1.ClusterAffinity
if placementPtr.ClusterAffinities != nil {
if binding.Status.SchedulerObservedGeneration != binding.Generation {
// Hit here means the binding maybe still in the queue waiting
// for scheduling or its status has not been synced to the
// cache. Just enqueue the binding to avoid missing the cluster
// update event.
s.onClusterResourceBindingRequeue(binding, metrics.ClusterChanged)
continue
}
affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName)
affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity
} else {
Expand All @@ -241,44 +312,11 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp
case affinity == nil:
// If no clusters specified, add it to the queue
fallthrough
case util.ClusterMatches(newCluster, *affinity):
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
fallthrough
case util.ClusterMatches(oldCluster, *affinity):
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
case util.ClusterMatches(cluster, *affinity):
// If the cluster manifest match the affinity, add it to the queue, trigger rescheduling
s.onClusterResourceBindingRequeue(binding, metrics.ClusterChanged)
}
}
}

func (s *Scheduler) deleteCluster(obj interface{}) {
var cluster *clusterv1alpha1.Cluster
switch t := obj.(type) {
case *clusterv1alpha1.Cluster:
cluster = t
case cache.DeletedFinalStateUnknown:
var ok bool
cluster, ok = t.Obj.(*clusterv1alpha1.Cluster)
if !ok {
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t)
return
}

klog.V(3).Infof("Delete event for cluster %s", cluster.Name)

if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.Add(cluster.Name)
}
}

func schedulerNameFilter(schedulerNameFromOptions, schedulerName string) bool {
if schedulerName == "" {
schedulerName = DefaultScheduler
}

return schedulerNameFromOptions == schedulerName
return nil
}
10 changes: 10 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Scheduler struct {
clusterLister clusterlister.ClusterLister
informerFactory informerfactory.SharedInformerFactory

// clusterReconcileWorker reconciles cluster changes to trigger corresponding
// ResourceBinding/ClusterResourceBinding rescheduling.
clusterReconcileWorker util.AsyncWorker
// TODO: implement a priority scheduling queue
queue workqueue.RateLimitingInterface

Expand Down Expand Up @@ -218,6 +221,11 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
schedulerCache: schedulerCache,
}

sched.clusterReconcileWorker = util.NewAsyncWorker(util.Options{
Name: "ClusterReconcileWorker",
ReconcileFunc: sched.reconcileCluster,
})

if options.enableSchedulerEstimator {
sched.enableSchedulerEstimator = options.enableSchedulerEstimator
sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode
Expand Down Expand Up @@ -255,6 +263,8 @@ func (s *Scheduler) Run(ctx context.Context) {
s.informerFactory.Start(stopCh)
s.informerFactory.WaitForCacheSync(stopCh)

s.clusterReconcileWorker.Run(1, stopCh)

go wait.Until(s.worker, time.Second, stopCh)

<-stopCh
Expand Down

0 comments on commit ec7b3b1

Please sign in to comment.