Skip to content

Commit

Permalink
podtopologyspread: scheduler queueing hints
Browse files Browse the repository at this point in the history
  • Loading branch information
nayihz committed Dec 6, 2023
1 parent 55f2bc1 commit 75434ce
Showing 1 changed file with 120 additions and 2 deletions.
122 changes: 120 additions & 2 deletions pkg/scheduler/framework/plugins/podtopologyspread/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package podtopologyspread
import (
"context"
"fmt"
"reflect"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)

const (
Expand Down Expand Up @@ -66,6 +70,10 @@ type PodTopologySpread struct {
enableMinDomainsInPodTopologySpread bool
enableNodeInclusionPolicyInPodTopologySpread bool
enableMatchLabelKeysInPodTopologySpread bool

// logger is only meant to be used by background activities which don't
// have some other logger in their parent callstack.
logger klog.Logger
}

var _ framework.PreFilterPlugin = &PodTopologySpread{}
Expand Down Expand Up @@ -141,9 +149,119 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint
// an unschedulable Pod schedulable.
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints,
// deleting an existing Pod may make it schedulable.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
// Node add|delete|updateLabel maybe lead an topology key changed,
// and make these pod in scheduling schedulable or unschedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
}
}

func (pl *PodTopologySpread) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
pl.logger.Error(err, "unexpected objects in isSchedulableAfterPodChange", "oldObj", oldObj, "newObj", newObj)
return framework.Queue, err
}

constraints, err := pl.getConstraints(pod)
if err != nil {
pl.logger.Error(err, "get constraints from pod failed:", "pod", klog.KObj(pod))
return framework.Queue, err
}

if modifiedPod != nil && originalPod != nil && !reflect.DeepEqual(modifiedPod.Labels, originalPod.Labels) {
for _, c := range constraints {
if c.Selector.Matches(labels.Set(originalPod.Labels)) || c.Selector.Matches(labels.Set(modifiedPod.Labels)) {
return framework.Queue, nil
}
}
return framework.QueueSkip, nil
}

if originalPod != nil {
for _, c := range constraints {
if c.Selector.Matches(labels.Set(originalPod.Labels)) {
return framework.Queue, nil
}
}
}

if modifiedPod != nil {
for _, c := range constraints {
if c.Selector.Matches(labels.Set(modifiedPod.Labels)) {
return framework.Queue, nil
}
}
}

return framework.QueueSkip, nil
}

func (pl *PodTopologySpread) getConstraints(pod *v1.Pod) ([]topologySpreadConstraint, error) {
var constraints []topologySpreadConstraint
var err error
if len(pod.Spec.TopologySpreadConstraints) > 0 {
// We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of Constraints.
constraints, err = pl.filterTopologySpreadConstraints(
pod.Spec.TopologySpreadConstraints,
pod.Labels,
v1.DoNotSchedule,
)
if err != nil {
return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
}
} else {
constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
if err != nil {
return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
}
}
return constraints, nil
}

func (pl *PodTopologySpread) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
if err != nil {
pl.logger.Error(err, "unexpected objects in isSchedulableAfterNodeChange", "oldObj", oldObj, "newObj", newObj)
return framework.Queue, err
}

constraints, err := pl.getConstraints(pod)
if err != nil {
pl.logger.Error(err, "get constraints from pod failed:", "pod", klog.KObj(pod))
return framework.Queue, err
}

// framework.UpdateNodeLabel: return Queue when topologyKey is added/removed in node's labels, else return QueueSkip.
if originalNode != nil && modifiedNode != nil {
pl.logger.V(4).Info("node was updated", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
if nodeLabelsMatchSpreadConstraints(originalNode.Labels, constraints) {
return framework.Queue, nil
}
if nodeLabelsMatchSpreadConstraints(modifiedNode.Labels, constraints) {
return framework.Queue, nil
}
return framework.QueueSkip, nil
}

// framework.Add: return Queue when node has topologyKey in its labels, else return QueueSkip.
if modifiedNode != nil {
pl.logger.V(4).Info("node was created", "pod", klog.KObj(pod))
if !nodeLabelsMatchSpreadConstraints(modifiedNode.Labels, constraints) {
return framework.QueueSkip, nil
}
return framework.Queue, nil
}

// framework.Delete: return Queue when node has topologyKey in its labels, else return QueueSkip.
if originalNode != nil {
pl.logger.V(4).Info("node was deleted", "pod", klog.KObj(pod))
if !nodeLabelsMatchSpreadConstraints(originalNode.Labels, constraints) {
return framework.QueueSkip, nil
}
return framework.Queue, nil
}

return framework.QueueSkip, nil
}

0 comments on commit 75434ce

Please sign in to comment.