Skip to content

Commit

Permalink
Merge pull request #79063 from Huang-Wei/eps-priority
Browse files Browse the repository at this point in the history
Even Pods Spread - 5. Priority Core
  • Loading branch information
k8s-ci-robot committed Jul 27, 2019
2 parents 0f32f9e + 755a311 commit b344fb0
Show file tree
Hide file tree
Showing 8 changed files with 775 additions and 17 deletions.
19 changes: 10 additions & 9 deletions pkg/scheduler/algorithm/predicates/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche
}
// In accordance to design, if NodeAffinity or NodeSelector is defined,
// spreading is applied to nodes that pass those filters.
if !podMatchesNodeSelectorAndAffinityTerms(pod, node) {
if !PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
return
}

// Ensure current node's labels contains all topologyKeys in 'constraints'.
if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
return
}
nodeTopologyMaps := newTopologyPairsMaps()
Expand All @@ -245,7 +245,7 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche
if existingPod.Namespace != pod.Namespace {
continue
}
ok, err := podMatchesSpreadConstraint(existingPod.Labels, constraint)
ok, err := PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
Expand Down Expand Up @@ -304,10 +304,11 @@ func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpr
return
}

// some corner cases:
// PodMatchesSpreadConstraint verifies if <constraint.LabelSelector> matches <podLabelSet>.
// Some corner cases:
// 1. podLabelSet = nil => returns (false, nil)
// 2. constraint.LabelSelector = nil => returns (false, nil)
func podMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) {
func PodMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) {
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
if err != nil {
return false, err
Expand All @@ -318,8 +319,8 @@ func podMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySp
return true, nil
}

// check if ALL topology keys in spread constraints are present in node labels
func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool {
// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool {
for _, constraint := range constraints {
if _, ok := nodeLabels[constraint.TopologyKey]; !ok {
return false
Expand Down Expand Up @@ -388,15 +389,15 @@ func (m *topologyPairsPodSpreadMap) addPod(addedPod, preemptorPod *v1.Pod, node
return nil
}
constraints := getHardTopologySpreadConstraints(preemptorPod)
if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
return nil
}

// records which topology key(s) needs to be updated
minMatchNeedingUpdate := make(map[string]struct{})
podLabelSet := labels.Set(addedPod.Labels)
for _, constraint := range constraints {
if match, err := podMatchesSpreadConstraint(podLabelSet, constraint); err != nil {
if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil {
return err
} else if !match {
continue
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/algorithm/predicates/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,12 +904,12 @@ func TestPodMatchesSpreadConstraint(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
podLabelSet := labels.Set(tt.podLabels)
got, err := podMatchesSpreadConstraint(podLabelSet, tt.constraint)
got, err := PodMatchesSpreadConstraint(podLabelSet, tt.constraint)
if (err != nil) != tt.wantErr {
t.Errorf("podMatchesSpreadConstraint() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("PodMatchesSpreadConstraint() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Errorf("podMatchesSpreadConstraint() = %v, want %v", got, tt.want)
t.Errorf("PodMatchesSpreadConstraint() = %v, want %v", got, tt.want)
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,9 @@ func nodeMatchesNodeSelectorTerms(node *v1.Node, nodeSelectorTerms []v1.NodeSele
return v1helper.MatchNodeSelectorTerms(nodeSelectorTerms, labels.Set(node.Labels), fields.Set(nodeFields))
}

// podMatchesNodeSelectorAndAffinityTerms checks whether the pod is schedulable onto nodes according to
// PodMatchesNodeSelectorAndAffinityTerms checks whether the pod is schedulable onto nodes according to
// the requirements in both NodeAffinity and nodeSelector.
func podMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
// Check if node.Labels match pod.Spec.NodeSelector.
if len(pod.Spec.NodeSelector) > 0 {
selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
Expand Down Expand Up @@ -906,7 +906,7 @@ func PodMatchNodeSelector(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedul
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
if podMatchesNodeSelectorAndAffinityTerms(pod, node) {
if PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
return true, nil, nil
}
return false, []PredicateFailureReason{ErrNodeSelectorNotMatch}, nil
Expand Down Expand Up @@ -1748,7 +1748,7 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *sche
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
}

selfMatch, err := podMatchesSpreadConstraint(podLabelSet, constraint)
selfMatch, err := PodMatchesSpreadConstraint(podLabelSet, constraint)
if err != nil {
return false, nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/algorithm/priorities/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
name = "go_default_library",
srcs = [
"balanced_resource_allocation.go",
"even_pods_spread.go",
"image_locality.go",
"interpod_affinity.go",
"least_requested.go",
Expand Down Expand Up @@ -37,6 +38,7 @@ go_library(
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand All @@ -54,6 +56,7 @@ go_test(
name = "go_default_test",
srcs = [
"balanced_resource_allocation_test.go",
"even_pods_spread_test.go",
"image_locality_test.go",
"interpod_affinity_test.go",
"least_requested_test.go",
Expand Down
209 changes: 209 additions & 0 deletions pkg/scheduler/algorithm/priorities/even_pods_spread.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package priorities

import (
"context"
"math"
"sync/atomic"

"k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"

"k8s.io/klog"
)

type topologyPair struct {
key string
value string
}

type topologySpreadConstraintsMap struct {
// nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods.
nodeNameToPodCounts map[string]int64
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
topologyPairToPodCounts map[topologyPair]*int64
}

func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap {
return &topologySpreadConstraintsMap{
nodeNameToPodCounts: make(map[string]int64),
topologyPairToPodCounts: make(map[topologyPair]*int64),
}
}

// Note: the <nodes> passed in are the "filtered" nodes which have passed Predicates.
// This function iterates <nodes> to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int64 pointer for eligible node only.
func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) {
constraints := getSoftTopologySpreadConstraints(pod)
for _, node := range nodes {
if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
continue
}
for _, constraint := range constraints {
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if t.topologyPairToPodCounts[pair] == nil {
t.topologyPairToPodCounts[pair] = new(int64)
}
}
t.nodeNameToPodCounts[node.Name] = 0
// For those nodes which don't have all required topologyKeys present, it's intentional to keep
// those entries absent in nodeNameToPodCounts, so that we're able to score them to 0 afterwards.
}
}

// CalculateEvenPodsSpreadPriority computes a score by checking through the topologySpreadConstraints
// that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint).
// The function works as below:
// 1) In all nodes, calculate the number of pods which match <pod>'s soft topology spread constraints.
// 2) Group the number calculated in 1) by topologyPair, and sum up to corresponding candidate nodes.
// 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred.
// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod.
// Whether existingPod matches incomingPod doesn't contribute to the final score.
// This is different from the Affinity API.
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
result := make(schedulerapi.HostPriorityList, len(nodes))
// return if incoming pod doesn't have soft topology spread constraints.
constraints := getSoftTopologySpreadConstraints(pod)
if len(constraints) == 0 {
return result, nil
}

t := newTopologySpreadConstraintsMap()
t.initialize(pod, nodes)

allNodeNames := make([]string, 0, len(nodeNameToInfo))
for name := range nodeNameToInfo {
allNodeNames = append(allNodeNames, name)
}

errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processAllNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]]
node := nodeInfo.Node()
if node == nil {
return
}
// (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity
// (2) All topologyKeys need to be present in `node`
if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
!predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
return
}

for _, constraint := range constraints {
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
// If current topology pair is not associated with any candidate node,
// continue to avoid unnecessary calculation.
if t.topologyPairToPodCounts[pair] == nil {
continue
}

// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int64(0)
for _, existingPod := range nodeInfo.Pods() {
match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
if match {
matchSum++
}
}
atomic.AddInt64(t.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processAllNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}

var minCount int64 = math.MaxInt64
// <total> sums up the number of matching pods on each qualified topology pair
var total int64
for _, node := range nodes {
if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
continue
}

// For each present <pair>, current node gets a credit of <matchSum>.
// And we add <matchSum> to <t.total> to reverse the final score later.
for _, constraint := range constraints {
if tpVal, ok := node.Labels[constraint.TopologyKey]; ok {
pair := topologyPair{key: constraint.TopologyKey, value: tpVal}
matchSum := *t.topologyPairToPodCounts[pair]
t.nodeNameToPodCounts[node.Name] += matchSum
total += matchSum
}
}
if t.nodeNameToPodCounts[node.Name] < minCount {
minCount = t.nodeNameToPodCounts[node.Name]
}
}

// calculate final priority score for each node
// TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible.
// current version ranks the nodes properly, but it doesn't take MaxSkew into
// consideration, we may come up with a better formula in the future.
maxMinDiff := total - minCount
for i := range nodes {
node := nodes[i]
result[i].Host = node.Name

// debugging purpose: print the value for each node
// score must be pointer here, otherwise it's always 0
if klog.V(10) {
defer func(score *int, nodeName string) {
klog.Infof("%v -> %v: EvenPodsSpreadPriority, Score: (%d)", pod.Name, nodeName, *score)
}(&result[i].Score, node.Name)
}

if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
result[i].Score = 0
continue
}
if maxMinDiff == 0 {
result[i].Score = schedulerapi.MaxPriority
continue
}
fScore := float64(schedulerapi.MaxPriority) * (float64(total-t.nodeNameToPodCounts[node.Name]) / float64(maxMinDiff))
result[i].Score = int(fScore)
}

return result, nil
}

// TODO(Huang-Wei): combine this with getHardTopologySpreadConstraints() in predicates package
func getSoftTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) {
if pod != nil {
for _, constraint := range pod.Spec.TopologySpreadConstraints {
if constraint.WhenUnsatisfiable == v1.ScheduleAnyway {
constraints = append(constraints, constraint)
}
}
}
return
}
Loading

0 comments on commit b344fb0

Please sign in to comment.