Skip to content

Commit

Permalink
[feature]support rescheduling when deleting a cluster
Browse files Browse the repository at this point in the history
Signed-off-by: huone1 <huwanxing@huawei.com>
  • Loading branch information
huone1 committed Mar 3, 2022
1 parent e02b5d3 commit f7e6ecd
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 141 deletions.
27 changes: 13 additions & 14 deletions pkg/scheduler/core/division_algorithm.go
Expand Up @@ -34,10 +34,13 @@ func divideReplicasByResource(
spec *workv1alpha2.ResourceBindingSpec,
preference policyv1alpha1.ReplicaDivisionPreference,
) ([]workv1alpha2.TargetCluster, error) {
// Step 1: Get previous total sum of replicas.
assignedReplicas := util.GetSumOfReplicas(spec.Clusters)
// Step 1: Find the ready clusters that have old replicas
scheduledClusters := findOutScheduledCluster(spec.Clusters, clusters)

// Step 2: Check the scale type (up or down).
// Step 2: calculate the assigned Replicas in scheduledClusters
assignedReplicas := util.GetSumOfReplicas(scheduledClusters)

// Step 3: Check the scale type (up or down).
if assignedReplicas > spec.Replicas {
// We need to reduce the replicas in terms of the previous result.
newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference)
Expand All @@ -48,7 +51,7 @@ func divideReplicasByResource(
} else if assignedReplicas < spec.Replicas {
// We need to enlarge the replicas in terms of the previous result (if exists).
// First scheduling is considered as a special kind of scaling up.
newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference)
newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference, scheduledClusters, assignedReplicas)
if err != nil {
return nil, fmt.Errorf("failed to scaleUp: %v", err)
}
Expand Down Expand Up @@ -202,24 +205,20 @@ func scaleUpScheduleByReplicaDivisionPreference(
clusters []*clusterv1alpha1.Cluster,
spec *workv1alpha2.ResourceBindingSpec,
preference policyv1alpha1.ReplicaDivisionPreference,
scheduledClusters []workv1alpha2.TargetCluster,
assignedReplicas int32,
) ([]workv1alpha2.TargetCluster, error) {
// Step 1: Find the clusters that have old replicas, so we can prefer to assign new replicas towards them.
scheduledClusters := findOutScheduledCluster(spec.Clusters, clusters)

// Step 2: calculate the assigned Replicas in scheduledClusters
assignedReplicas := util.GetSumOfReplicas(scheduledClusters)

// Step 3: Get how many replicas should be scheduled in this cycle and construct a new object if necessary
// Step 1: Get how many replicas should be scheduled in this cycle and construct a new object if necessary
newSpec := spec
if assignedReplicas > 0 {
newSpec = spec.DeepCopy()
newSpec.Replicas = spec.Replicas - assignedReplicas
}

// Step 4: Calculate available replicas of all candidates
// Step 2: Calculate available replicas of all candidates
clusterAvailableReplicas := calAvailableReplicas(clusters, newSpec)

// Step 5: Begin dividing.
// Step 3: Begin dividing.
// Only the new replicas are considered during this scheduler, the old replicas will not be moved.
// If not, the old replicas may be recreated which is not expected during scaling up.
// The parameter `scheduledClusterNames` is used to make sure that we assign new replicas to them preferentially
Expand All @@ -230,6 +229,6 @@ func scaleUpScheduleByReplicaDivisionPreference(
return result, err
}

// Step 6: Merge the result of previous and new results.
// Step 4: Merge the result of previous and new results.
return util.MergeTargetClusters(scheduledClusters, result), nil
}
52 changes: 0 additions & 52 deletions pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand All @@ -21,7 +20,6 @@ import (
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
type ScheduleAlgorithm interface {
Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
FailoverSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
}

// ScheduleResult includes the clusters selected.
Expand Down Expand Up @@ -246,53 +244,3 @@ func (g *genericScheduler) assignReplicas(
}
return targetClusters, nil
}

func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *policyv1alpha1.Placement,
spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) {
readyClusters := g.schedulerCache.Snapshot().GetReadyClusterNames()
totalClusters := util.ConvertToClusterNames(spec.Clusters)

reservedClusters := calcReservedCluster(totalClusters, readyClusters)
availableClusters := calcAvailableCluster(totalClusters, readyClusters)

candidateClusters := sets.NewString()
for clusterName := range availableClusters {
clusterObj := g.schedulerCache.Snapshot().GetCluster(clusterName)
if clusterObj == nil {
return result, fmt.Errorf("failed to get clusterObj by clusterName: %s", clusterName)
}

if result := g.scheduleFramework.RunFilterPlugins(ctx, placement, &spec.Resource, clusterObj.Cluster()); !result.IsSuccess() {
klog.V(4).Infof("cluster %q is not fit", clusterName)
} else {
candidateClusters.Insert(clusterName)
}
}

klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List())
klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List())

// TODO: should schedule as much as possible?
deltaLen := len(spec.Clusters) - len(reservedClusters)
if len(candidateClusters) < deltaLen {
// for ReplicaSchedulingTypeDivided, we will try to migrate replicas to the other health clusters
if placement.ReplicaScheduling == nil || placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
klog.Warningf("ignore reschedule binding as insufficient available cluster")
return ScheduleResult{}, nil
}
}

// TODO: check if the final result meets the spread constraints.
targetClusters := reservedClusters
clusterList := candidateClusters.List()
for i := 0; i < deltaLen && i < len(candidateClusters); i++ {
targetClusters.Insert(clusterList[i])
}

var reScheduleResult []workv1alpha2.TargetCluster
for cluster := range targetClusters {
reScheduleResult = append(reScheduleResult, workv1alpha2.TargetCluster{Name: cluster})
}

return ScheduleResult{reScheduleResult}, nil
}
10 changes: 0 additions & 10 deletions pkg/scheduler/core/util.go
Expand Up @@ -120,13 +120,3 @@ func resortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, sc
klog.V(4).Infof("Resorted target cluster: %v", clusterAvailableReplicas)
return clusterAvailableReplicas
}

// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'.
func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String {
return bindClusters.Difference(bindClusters.Difference(readyClusters))
}

// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'.
func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String {
return readyClusters.Difference(bindCluster)
}
4 changes: 4 additions & 0 deletions pkg/scheduler/event_handler.go
Expand Up @@ -277,8 +277,12 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t)
return
}

klog.V(3).Infof("Delete event for cluster %s", cluster.Name)

s.enqueueAffectedBinding(cluster.Name)
s.enqueueAffectedClusterBinding(cluster.Name)

if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.Add(cluster.Name)
}
Expand Down
85 changes: 20 additions & 65 deletions pkg/scheduler/scheduler.go
Expand Up @@ -292,9 +292,10 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
klog.Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name)
return nil
}

if features.FeatureGate.Enabled(features.Failover) {
klog.Infof("Reschedule ResourceBinding(%s/%s) as cluster failure", namespace, name)
err = s.rescheduleResourceBinding(rb)
klog.Infof("Reschedule ResourceBinding(%s/%s) as cluster failure or deletion", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
return err
}
Expand Down Expand Up @@ -355,8 +356,8 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
return nil
}
if features.FeatureGate.Enabled(features.Failover) {
klog.Infof("Reschedule ClusterResourceBinding(%s) as cluster failure", name)
err = s.rescheduleClusterResourceBinding(crb)
klog.Infof("Reschedule ClusterResourceBinding(%s) as cluster failure or deletion", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
return err
}
Expand Down Expand Up @@ -442,73 +443,27 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}

func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) error {
klog.V(4).InfoS("Begin rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
defer klog.V(4).InfoS("End rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))

policyName := util.GetLabelValue(clusterResourceBinding.Labels, policyv1alpha1.ClusterPropagationPolicyLabel)
policy, err := s.clusterPolicyLister.Get(policyName)
if err != nil {
klog.Errorf("Failed to get policy by policyName(%s): Error: %v", policyName, err)
return err
}
reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
if err != nil {
return err
}
if len(reScheduleResult.SuggestedClusters) == 0 {
return nil
}

clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)

_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}

return nil
}

func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error {
klog.V(4).InfoS("Begin rescheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
defer klog.V(4).InfoS("End rescheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))

placement, _, err := s.getPlacement(resourceBinding)
if err != nil {
klog.Errorf("Failed to get placement by resourceBinding(%s/%s): Error: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
}
reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &placement, &resourceBinding.Spec)
if err != nil {
return err
}
if len(reScheduleResult.SuggestedClusters) == 0 {
return nil
}

resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)

_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}

return nil
}

func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool {
clusters := s.schedulerCache.Snapshot().GetClusters()
for i := range tcs {
isNoExisted := true
for _, c := range clusters {
if c.Cluster().Name == tcs[i].Name {
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
return false
}
if c.Cluster().Name != tcs[i].Name {
continue
}

isNoExisted = false
if meta.IsStatusConditionFalse(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady) ||
!c.Cluster().DeletionTimestamp.IsZero() {
return false
}

break
}

if isNoExisted {
// don't find the target cluster in snapshot because it might have been deleted
return false
}
}
return true
Expand Down

0 comments on commit f7e6ecd

Please sign in to comment.