Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1385 from irfanurrehman/rsp-reconcile-fix
Browse files Browse the repository at this point in the history
Replica rebalance reconciliation fix
  • Loading branch information
k8s-ci-robot committed Mar 24, 2021
2 parents 179511a + a2275dc commit 66abaa4
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 25 deletions.
25 changes: 20 additions & 5 deletions pkg/controller/util/podanalyzer/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

api_v1 "k8s.io/api/core/v1"

ctlutil "sigs.k8s.io/kubefed/pkg/controller/util"
)

type PodAnalysisResult struct {
Expand All @@ -41,8 +43,9 @@ const (
// AnalyzePods calculates how many pods from the list are in one of
// the meaningful (from the replica set perspective) states. This function is
// a temporary workaround against the current lack of ownerRef in pods.
func AnalyzePods(podList *api_v1.PodList, currentTime time.Time) PodAnalysisResult {
func AnalyzePods(podList *api_v1.PodList, currentTime time.Time) (PodAnalysisResult, ctlutil.ReconciliationStatus) {
result := PodAnalysisResult{}
unschedulableRightNow := 0
for _, pod := range podList.Items {
result.Total++
for _, condition := range pod.Status.Conditions {
Expand All @@ -52,11 +55,23 @@ func AnalyzePods(podList *api_v1.PodList, currentTime time.Time) PodAnalysisResu
}
} else if condition.Type == api_v1.PodScheduled &&
condition.Status == api_v1.ConditionFalse &&
condition.Reason == api_v1.PodReasonUnschedulable &&
condition.LastTransitionTime.Add(UnschedulableThreshold).Before(currentTime) {
result.Unschedulable++
condition.Reason == api_v1.PodReasonUnschedulable {
unschedulableRightNow++
if condition.LastTransitionTime.Add(UnschedulableThreshold).Before(currentTime) {
result.Unschedulable++
}
}
}
}
return result
if unschedulableRightNow != result.Unschedulable {
// We get the reconcile event almost immediately after the status of a
// pod changes, however we will not consider the unschedulable pods as
// unschedulable immediately (until 60 secs), because we don't want to
// change state frequently (it can lead to continuously moving replicas
// around). We need to reconcile again after a timeout. We use the return
// status to indicate retry for reconcile.
return result, ctlutil.StatusNeedsRecheck
}

return result, ctlutil.StatusAllOK
}
25 changes: 21 additions & 4 deletions pkg/controller/util/podanalyzer/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ctlutil "sigs.k8s.io/kubefed/pkg/controller/util"
)

func TestAnalyze(t *testing.T) {
Expand All @@ -38,7 +40,7 @@ func TestAnalyze(t *testing.T) {
},
},
})
podUnschedulable := newPod("pU",
podUnschedulableTimeElapsed := newPod("pU",
api_v1.PodStatus{
Phase: api_v1.PodPending,
Conditions: []api_v1.PodCondition{
Expand All @@ -50,25 +52,40 @@ func TestAnalyze(t *testing.T) {
},
},
})
podUnschedulableRightNow := newPod("pU",
api_v1.PodStatus{
Phase: api_v1.PodPending,
Conditions: []api_v1.PodCondition{
{
Type: api_v1.PodScheduled,
Status: api_v1.ConditionFalse,
Reason: api_v1.PodReasonUnschedulable,
LastTransitionTime: metav1.Time{Time: now},
},
},
})
podOther := newPod("pO",
api_v1.PodStatus{
Phase: api_v1.PodPending,
Conditions: []api_v1.PodCondition{},
})

result := AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podRunning, *podRunning, *podRunning, *podUnschedulable, *podUnschedulable}}, now)
result, status := AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podRunning, *podRunning,
*podRunning, *podUnschedulableTimeElapsed, *podUnschedulableTimeElapsed, *podUnschedulableRightNow}}, now)
assert.Equal(t, PodAnalysisResult{
Total: 5,
Total: 6,
RunningAndReady: 3,
Unschedulable: 2,
}, result)
assert.Equal(t, status, ctlutil.StatusNeedsRecheck)

result = AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podOther}}, now)
result, status = AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podOther}}, now)
assert.Equal(t, PodAnalysisResult{
Total: 1,
RunningAndReady: 0,
Unschedulable: 0,
}, result)
assert.Equal(t, status, ctlutil.StatusAllOK)
}

func newPod(name string, status api_v1.PodStatus) *api_v1.Pod {
Expand Down
40 changes: 24 additions & 16 deletions pkg/schedulingtypes/replicascheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *ReplicaScheduler) Reconcile(obj pkgruntime.Object, qualifiedName ctluti
}

key := qualifiedName.String()
result, err := s.GetSchedulingResult(rsp, qualifiedName, clusterNames)
result, status, err := s.GetSchedulingResult(rsp, qualifiedName, clusterNames)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Failed to compute the schedule information while reconciling RSP named %q", key))
return ctlutil.StatusError
Expand All @@ -202,7 +202,7 @@ func (s *ReplicaScheduler) Reconcile(obj pkgruntime.Object, qualifiedName ctluti
return ctlutil.StatusError
}

return ctlutil.StatusAllOK
return status
}

// The list of clusters could come from any target informer
Expand All @@ -219,7 +219,8 @@ func (s *ReplicaScheduler) clusterNames() ([]string, error) {
return clusterNames, nil
}

func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference,
qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, ctlutil.ReconciliationStatus, error) {
key := qualifiedName.String()

objectGetter := func(clusterName, key string) (interface{}, bool, error) {
Expand Down Expand Up @@ -250,9 +251,9 @@ func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSch
return podList, nil
}

currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
currentReplicasPerCluster, estimatedCapacity, status, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
if err != nil {
return nil, err
return nil, status, err
}

// TODO: Move this to API defaulting logic
Expand All @@ -263,7 +264,11 @@ func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSch
}

plnr := planner.NewPlanner(rsp)
return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
scheduleResult, err := schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
if err != nil {
return nil, status, err
}
return scheduleResult, status, err
}

func schedule(planner *planner.Planner, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) (map[string]int64, error) {
Expand Down Expand Up @@ -311,14 +316,16 @@ func clustersReplicaState(
clusterNames []string,
key string,
objectGetter func(clusterName string, key string) (interface{}, bool, error),
podsGetter func(clusterName string, obj *unstructured.Unstructured) (*corev1.PodList, error)) (currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, err error) {
podsGetter func(clusterName string, obj *unstructured.Unstructured) (*corev1.PodList, error)) (
currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64,
status ctlutil.ReconciliationStatus, err error) {
currentReplicasPerCluster = make(map[string]int64)
estimatedCapacity = make(map[string]int64)

for _, clusterName := range clusterNames {
obj, exists, err := objectGetter(clusterName, key)
if err != nil {
return nil, nil, err
return nil, nil, status, err
}
if !exists {
continue
Expand All @@ -327,14 +334,14 @@ func clustersReplicaState(
unstructuredObj := obj.(*unstructured.Unstructured)
replicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "spec", "replicas")
if err != nil {
return nil, nil, errors.Wrap(err, "Error retrieving 'replicas' field")
return nil, nil, status, errors.Wrap(err, "Error retrieving 'replicas' field")
}
if !ok {
replicas = int64(0)
}
readyReplicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "status", "readyreplicas")
readyReplicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "status", "readyReplicas")
if err != nil {
return nil, nil, errors.Wrap(err, "Error retrieving 'readyreplicas' field")
return nil, nil, status, errors.Wrap(err, "Error retrieving 'readyreplicas' field")
}
if !ok {
readyReplicas = int64(0)
Expand All @@ -346,16 +353,17 @@ func clustersReplicaState(
currentReplicasPerCluster[clusterName] = int64(0)
podList, err := podsGetter(clusterName, unstructuredObj)
if err != nil {
return nil, nil, err
return nil, nil, status, err
}

podStatus := podanalyzer.AnalyzePods(podList, time.Now())
currentReplicasPerCluster[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
unschedulable := int64(podStatus.Unschedulable)
podResult := podanalyzer.PodAnalysisResult{}
podResult, status = podanalyzer.AnalyzePods(podList, time.Now())
currentReplicasPerCluster[clusterName] = int64(podResult.RunningAndReady) // include pending as well?
unschedulable := int64(podResult.Unschedulable)
if unschedulable > 0 {
estimatedCapacity[clusterName] = replicas - unschedulable
}
}
}
return currentReplicasPerCluster, estimatedCapacity, nil
return currentReplicasPerCluster, estimatedCapacity, status, nil
}

0 comments on commit 66abaa4

Please sign in to comment.