New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store topology spread constraints in metadata with labels.Selector #85157
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,7 @@ func (paths *criticalPaths) update(tpVal string, num int32) { | |
// (1) critical paths where the least pods are matched on each spread constraint. | ||
// (2) number of pods matched on each spread constraint. | ||
type evenPodsSpreadMetadata struct { | ||
constraints []topologySpreadConstraint | ||
// We record 2 critical paths instead of all critical paths here. | ||
// criticalPaths[0].matchNum always holds the minimum matching number. | ||
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but | ||
|
@@ -125,6 +126,15 @@ type evenPodsSpreadMetadata struct { | |
tpPairToMatchNum map[topologyPair]int32 | ||
} | ||
|
||
// topologySpreadConstraint is an internal version for a hard (DoNotSchedule | ||
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the | ||
// selector is parsed. | ||
type topologySpreadConstraint struct { | ||
maxSkew int32 | ||
topologyKey string | ||
selector labels.Selector | ||
} | ||
|
||
type serviceAffinityMetadata struct { | ||
matchingPodList []*v1.Pod | ||
matchingPodServices []*v1.Service | ||
|
@@ -420,17 +430,20 @@ func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo, | |
func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) { | ||
// We have feature gating in APIServer to strip the spec | ||
// so don't need to re-check feature gate, just check length of constraints. | ||
constraints := getHardTopologySpreadConstraints(pod) | ||
constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(constraints) == 0 { | ||
return nil, nil | ||
} | ||
|
||
errCh := schedutil.NewErrorChannel() | ||
var lock sync.Mutex | ||
|
||
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)". | ||
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion. | ||
m := evenPodsSpreadMetadata{ | ||
constraints: constraints, | ||
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)), | ||
tpPairToMatchNum: make(map[topologyPair]int32), | ||
} | ||
|
@@ -440,8 +453,6 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn | |
lock.Unlock() | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
processNode := func(i int) { | ||
nodeInfo := allNodes[i] | ||
node := nodeInfo.Node() | ||
|
@@ -466,28 +477,19 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn | |
if existingPod.Namespace != pod.Namespace { | ||
continue | ||
} | ||
ok, err := PodMatchesSpreadConstraint(existingPod.Labels, constraint) | ||
if err != nil { | ||
errCh.SendErrorWithCancel(err, cancel) | ||
return | ||
} | ||
if ok { | ||
if constraint.selector.Matches(labels.Set(existingPod.Labels)) { | ||
matchTotal++ | ||
} | ||
} | ||
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} | ||
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]} | ||
addTopologyPairMatchNum(pair, matchTotal) | ||
} | ||
} | ||
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) | ||
|
||
if err := errCh.ReceiveError(); err != nil { | ||
return nil, err | ||
} | ||
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode) | ||
|
||
// calculate min match for each topology pair | ||
for i := 0; i < len(constraints); i++ { | ||
key := constraints[i].TopologyKey | ||
key := constraints[i].topologyKey | ||
m.tpKeyToCriticalPaths[key] = newCriticalPaths() | ||
} | ||
for pair, num := range m.tpPairToMatchNum { | ||
|
@@ -497,36 +499,28 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn | |
return &m, nil | ||
} | ||
|
||
func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) { | ||
if pod != nil { | ||
for _, constraint := range pod.Spec.TopologySpreadConstraints { | ||
if constraint.WhenUnsatisfiable == v1.DoNotSchedule { | ||
constraints = append(constraints, constraint) | ||
func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) { | ||
var result []topologySpreadConstraint | ||
for _, c := range constraints { | ||
if c.WhenUnsatisfiable == v1.DoNotSchedule { | ||
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this is the key perf of the improvement - A followup is to do a thorough check of "repeated usage of LabelSelectorAsSelector" and do similar refactoring. I do recall there was a PR using the same rationale improving the performance of PodAffinity, probably #79465. We should consolidate this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds like suggestion. cc @liu-cong who is looking into performance in general. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should run some benchmarks first, but we could annotate the pods in the scheduler cache with already processed Selectors for affinity and spread constraints, and so we do this conversion per pod rather than per pod per cycle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG:) I was discussing with ahg-g yesterday about what can be cached per pod and this seems to fall into that category. |
||
if err != nil { | ||
return nil, err | ||
} | ||
result = append(result, topologySpreadConstraint{ | ||
maxSkew: c.MaxSkew, | ||
topologyKey: c.TopologyKey, | ||
selector: selector, | ||
}) | ||
} | ||
} | ||
return | ||
} | ||
|
||
// PodMatchesSpreadConstraint verifies if <constraint.LabelSelector> matches <podLabelSet>. | ||
// Some corner cases: | ||
// 1. podLabelSet = nil => returns (false, nil) | ||
// 2. constraint.LabelSelector = nil => returns (false, nil) | ||
func PodMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) { | ||
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector) | ||
if err != nil { | ||
return false, err | ||
} | ||
if !selector.Matches(podLabelSet) { | ||
return false, nil | ||
} | ||
return true, nil | ||
return result, nil | ||
} | ||
|
||
// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels. | ||
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool { | ||
for _, constraint := range constraints { | ||
if _, ok := nodeLabels[constraint.TopologyKey]; !ok { | ||
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool { | ||
for _, c := range constraints { | ||
if _, ok := nodeLabels[c.topologyKey]; !ok { | ||
return false | ||
} | ||
} | ||
|
@@ -581,57 +575,55 @@ func (m *topologyPairsMaps) clone() *topologyPairsMaps { | |
return copy | ||
} | ||
|
||
func (c *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error { | ||
return c.updatePod(addedPod, preemptorPod, node, 1) | ||
func (m *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) { | ||
m.updatePod(addedPod, preemptorPod, node, 1) | ||
} | ||
|
||
func (c *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) error { | ||
return c.updatePod(deletedPod, preemptorPod, node, -1) | ||
func (m *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) { | ||
m.updatePod(deletedPod, preemptorPod, node, -1) | ||
} | ||
|
||
func (c *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error { | ||
if updatedPod.Namespace != preemptorPod.Namespace || node == nil { | ||
return nil | ||
func (m *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) { | ||
if m == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil { | ||
return | ||
} | ||
constraints := getHardTopologySpreadConstraints(preemptorPod) | ||
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { | ||
return nil | ||
if !NodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) { | ||
return | ||
} | ||
|
||
podLabelSet := labels.Set(updatedPod.Labels) | ||
for _, constraint := range constraints { | ||
if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil { | ||
return err | ||
} else if !match { | ||
for _, constraint := range m.constraints { | ||
if !constraint.selector.Matches(podLabelSet) { | ||
continue | ||
} | ||
|
||
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey] | ||
k, v := constraint.topologyKey, node.Labels[constraint.topologyKey] | ||
pair := topologyPair{key: k, value: v} | ||
c.tpPairToMatchNum[pair] = c.tpPairToMatchNum[pair] + delta | ||
m.tpPairToMatchNum[pair] = m.tpPairToMatchNum[pair] + delta | ||
|
||
c.tpKeyToCriticalPaths[k].update(v, c.tpPairToMatchNum[pair]) | ||
m.tpKeyToCriticalPaths[k].update(v, m.tpPairToMatchNum[pair]) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata { | ||
func (m *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata { | ||
// c could be nil when EvenPodsSpread feature is disabled | ||
if c == nil { | ||
if m == nil { | ||
return nil | ||
} | ||
copy := evenPodsSpreadMetadata{ | ||
tpKeyToCriticalPaths: make(map[string]*criticalPaths), | ||
tpPairToMatchNum: make(map[topologyPair]int32), | ||
cp := evenPodsSpreadMetadata{ | ||
// constraints are shared because they don't change. | ||
constraints: m.constraints, | ||
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(m.tpKeyToCriticalPaths)), | ||
tpPairToMatchNum: make(map[topologyPair]int32, len(m.tpPairToMatchNum)), | ||
} | ||
for tpKey, paths := range c.tpKeyToCriticalPaths { | ||
copy.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]} | ||
for tpKey, paths := range m.tpKeyToCriticalPaths { | ||
cp.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]} | ||
} | ||
for tpPair, matchNum := range c.tpPairToMatchNum { | ||
for tpPair, matchNum := range m.tpPairToMatchNum { | ||
copyPair := topologyPair{key: tpPair.key, value: tpPair.value} | ||
copy.tpPairToMatchNum[copyPair] = matchNum | ||
cp.tpPairToMatchNum[copyPair] = matchNum | ||
} | ||
return © | ||
return &cp | ||
} | ||
|
||
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is | ||
|
@@ -642,10 +634,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) erro | |
return fmt.Errorf("deletedPod and meta.pod must not be the same") | ||
} | ||
meta.podAffinityMetadata.removePod(deletedPod) | ||
// Delete pod from the pod spread topology maps. | ||
if err := meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node); err != nil { | ||
return err | ||
} | ||
meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node) | ||
meta.serviceAffinityMetadata.removePod(deletedPod, node) | ||
|
||
return nil | ||
|
@@ -667,9 +656,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error { | |
} | ||
// Update meta.evenPodsSpreadMetadata if meta.pod has hard spread constraints | ||
// and addedPod matches that | ||
if err := meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node); err != nil { | ||
return err | ||
} | ||
meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node) | ||
|
||
meta.serviceAffinityMetadata.addPod(addedPod, meta.pod, node) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To not confuse with v1.TopologySpreadConstraint, suggest to rename to
internal TopologySpreadConstraint
or similar name.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One is just the parsed version of the other. I don't see any harm on them having the same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or, let's put a comment to emphasize that (1) it's an internal version which has a particular
labels.Selector
field instead ofmetav1.Selector
, (2) it's for simplifying the matching logic and performance.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done