Skip to content

Commit

Permalink
Merge pull request #85157 from alculquicondor/refactor/selector
Browse files Browse the repository at this point in the history
Store topology spread constraints in metadata with labels.Selector
  • Loading branch information
k8s-ci-robot committed Nov 13, 2019
2 parents f5df681 + e3bdb4b commit 209c025
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 253 deletions.
139 changes: 63 additions & 76 deletions pkg/scheduler/algorithm/predicates/metadata.go
Expand Up @@ -116,6 +116,7 @@ func (paths *criticalPaths) update(tpVal string, num int32) {
// (1) critical paths where the least pods are matched on each spread constraint.
// (2) number of pods matched on each spread constraint.
type evenPodsSpreadMetadata struct {
constraints []topologySpreadConstraint
// We record 2 critical paths instead of all critical paths here.
// criticalPaths[0].matchNum always holds the minimum matching number.
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
Expand All @@ -125,6 +126,15 @@ type evenPodsSpreadMetadata struct {
tpPairToMatchNum map[topologyPair]int32
}

// topologySpreadConstraint is an internal version for a hard (DoNotSchedule
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
// selector is parsed.
type topologySpreadConstraint struct {
maxSkew int32
topologyKey string
selector labels.Selector
}

type serviceAffinityMetadata struct {
matchingPodList []*v1.Pod
matchingPodServices []*v1.Service
Expand Down Expand Up @@ -420,17 +430,20 @@ func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo,
func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) {
// We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of constraints.
constraints := getHardTopologySpreadConstraints(pod)
constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
if err != nil {
return nil, err
}
if len(constraints) == 0 {
return nil, nil
}

errCh := schedutil.NewErrorChannel()
var lock sync.Mutex

// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
m := evenPodsSpreadMetadata{
constraints: constraints,
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
tpPairToMatchNum: make(map[topologyPair]int32),
}
Expand All @@ -440,8 +453,6 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
lock.Unlock()
}

ctx, cancel := context.WithCancel(context.Background())

processNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
Expand All @@ -466,28 +477,19 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
if existingPod.Namespace != pod.Namespace {
continue
}
ok, err := PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
if ok {
if constraint.selector.Matches(labels.Set(existingPod.Labels)) {
matchTotal++
}
}
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
addTopologyPairMatchNum(pair, matchTotal)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)

if err := errCh.ReceiveError(); err != nil {
return nil, err
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)

// calculate min match for each topology pair
for i := 0; i < len(constraints); i++ {
key := constraints[i].TopologyKey
key := constraints[i].topologyKey
m.tpKeyToCriticalPaths[key] = newCriticalPaths()
}
for pair, num := range m.tpPairToMatchNum {
Expand All @@ -497,36 +499,28 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
return &m, nil
}

func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) {
if pod != nil {
for _, constraint := range pod.Spec.TopologySpreadConstraints {
if constraint.WhenUnsatisfiable == v1.DoNotSchedule {
constraints = append(constraints, constraint)
func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
var result []topologySpreadConstraint
for _, c := range constraints {
if c.WhenUnsatisfiable == v1.DoNotSchedule {
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
if err != nil {
return nil, err
}
result = append(result, topologySpreadConstraint{
maxSkew: c.MaxSkew,
topologyKey: c.TopologyKey,
selector: selector,
})
}
}
return
}

// PodMatchesSpreadConstraint verifies if <constraint.LabelSelector> matches <podLabelSet>.
// Some corner cases:
// 1. podLabelSet = nil => returns (false, nil)
// 2. constraint.LabelSelector = nil => returns (false, nil)
func PodMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) {
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
if err != nil {
return false, err
}
if !selector.Matches(podLabelSet) {
return false, nil
}
return true, nil
return result, nil
}

// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool {
for _, constraint := range constraints {
if _, ok := nodeLabels[constraint.TopologyKey]; !ok {
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
for _, c := range constraints {
if _, ok := nodeLabels[c.topologyKey]; !ok {
return false
}
}
Expand Down Expand Up @@ -581,57 +575,55 @@ func (m *topologyPairsMaps) clone() *topologyPairsMaps {
return copy
}

func (c *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
return c.updatePod(addedPod, preemptorPod, node, 1)
func (m *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) {
m.updatePod(addedPod, preemptorPod, node, 1)
}

func (c *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) error {
return c.updatePod(deletedPod, preemptorPod, node, -1)
func (m *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) {
m.updatePod(deletedPod, preemptorPod, node, -1)
}

func (c *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error {
if updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return nil
func (m *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) {
if m == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return
}
constraints := getHardTopologySpreadConstraints(preemptorPod)
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
return nil
if !NodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
return
}

podLabelSet := labels.Set(updatedPod.Labels)
for _, constraint := range constraints {
if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil {
return err
} else if !match {
for _, constraint := range m.constraints {
if !constraint.selector.Matches(podLabelSet) {
continue
}

k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
k, v := constraint.topologyKey, node.Labels[constraint.topologyKey]
pair := topologyPair{key: k, value: v}
c.tpPairToMatchNum[pair] = c.tpPairToMatchNum[pair] + delta
m.tpPairToMatchNum[pair] = m.tpPairToMatchNum[pair] + delta

c.tpKeyToCriticalPaths[k].update(v, c.tpPairToMatchNum[pair])
m.tpKeyToCriticalPaths[k].update(v, m.tpPairToMatchNum[pair])
}
return nil
}

func (c *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
func (m *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
// c could be nil when EvenPodsSpread feature is disabled
if c == nil {
if m == nil {
return nil
}
copy := evenPodsSpreadMetadata{
tpKeyToCriticalPaths: make(map[string]*criticalPaths),
tpPairToMatchNum: make(map[topologyPair]int32),
cp := evenPodsSpreadMetadata{
// constraints are shared because they don't change.
constraints: m.constraints,
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(m.tpKeyToCriticalPaths)),
tpPairToMatchNum: make(map[topologyPair]int32, len(m.tpPairToMatchNum)),
}
for tpKey, paths := range c.tpKeyToCriticalPaths {
copy.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
for tpKey, paths := range m.tpKeyToCriticalPaths {
cp.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
}
for tpPair, matchNum := range c.tpPairToMatchNum {
for tpPair, matchNum := range m.tpPairToMatchNum {
copyPair := topologyPair{key: tpPair.key, value: tpPair.value}
copy.tpPairToMatchNum[copyPair] = matchNum
cp.tpPairToMatchNum[copyPair] = matchNum
}
return &copy
return &cp
}

// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
Expand All @@ -642,10 +634,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) erro
return fmt.Errorf("deletedPod and meta.pod must not be the same")
}
meta.podAffinityMetadata.removePod(deletedPod)
// Delete pod from the pod spread topology maps.
if err := meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node); err != nil {
return err
}
meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node)
meta.serviceAffinityMetadata.removePod(deletedPod, node)

return nil
Expand All @@ -667,9 +656,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
}
// Update meta.evenPodsSpreadMetadata if meta.pod has hard spread constraints
// and addedPod matches that
if err := meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node); err != nil {
return err
}
meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node)

meta.serviceAffinityMetadata.addPod(addedPod, meta.pod, node)

Expand Down

0 comments on commit 209c025

Please sign in to comment.