Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First step of optimizing PodAffinity priority function #28952

Merged
merged 1 commit into from
Jul 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,10 +853,11 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in
// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) {
for _, ep := range allPods {
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm,
func(ep *api.Pod) (*api.Node, error) { return checker.info.GetNodeInfo(ep.Spec.NodeName) },
func(pod *api.Pod) (*api.Node, error) { return node, nil },
)
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil {
return false, err
}
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, epNode, node, pod, podAffinityTerm)
if err != nil || match {
return match, err
}
Expand Down
213 changes: 102 additions & 111 deletions plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package priorities
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
Expand All @@ -35,7 +36,12 @@ type InterPodAffinity struct {
failureDomains priorityutil.Topologies
}

func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.NodeLister, podLister algorithm.PodLister, hardPodAffinityWeight int, failureDomains []string) algorithm.PriorityFunction {
func NewInterPodAffinityPriority(
info predicates.NodeInfo,
nodeLister algorithm.NodeLister,
podLister algorithm.PodLister,
hardPodAffinityWeight int,
failureDomains []string) algorithm.PriorityFunction {
interPodAffinity := &InterPodAffinity{
info: info,
nodeLister: nodeLister,
Expand All @@ -46,36 +52,19 @@ func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.
return interPodAffinity.CalculateInterPodAffinityPriority
}

// countPodsThatMatchPodAffinityTerm counts the number of given pods that match the podAffinityTerm.
func (ipa *InterPodAffinity) CountPodsThatMatchPodAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (int, error) {
matchedCount := 0
for _, ep := range podsForMatching {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm,
func(ep *api.Pod) (*api.Node, error) {
return ipa.info.GetNodeInfo(ep.Spec.NodeName)
},
func(pod *api.Pod) (*api.Node, error) {
return node, nil
},
)
if err != nil {
return 0, err
}
if match {
matchedCount++
}
// TODO: Share it with predicates by moving to better location.
// TODO: Can we avoid error handling here - this is only a matter of non-parsable selector?
func podMatchesNamespaceAndSelector(pod *api.Pod, affinityPod *api.Pod, term *api.PodAffinityTerm) (bool, error) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(affinityPod, *term)
if len(namespaces) != 0 && !namespaces.Has(pod.Namespace) {
return false, nil
}
return matchedCount, nil
}

// CountWeightByPodMatchAffinityTerm counts the weight to topologyCounts for all the given pods that match the podAffinityTerm.
func (ipa *InterPodAffinity) CountWeightByPodMatchAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, weight int, podAffinityTerm api.PodAffinityTerm, node *api.Node) (int, error) {
if weight == 0 {
return 0, nil
selector, err := unversioned.LabelSelectorAsSelector(term.LabelSelector)
if err != nil || !selector.Matches(labels.Set(pod.Labels)) {
return false, err
}
// get the pods which are there in that particular node
podsMatchedCount, err := ipa.CountPodsThatMatchPodAffinityTerm(pod, podsForMatching, node, podAffinityTerm)
return weight * podsMatchedCount, err
return true, nil
}

// compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
Expand All @@ -98,99 +87,100 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod
}

// convert the topology key based weights to the node name based weights
var maxCount int
var minCount int
counts := map[string]int{}
for _, node := range nodes {
totalCount := 0
// count weights for the weighted pod affinity
if affinity.PodAffinity != nil {
for _, weightedTerm := range affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, weightedTerm.Weight, weightedTerm.PodAffinityTerm, node)
if err != nil {
return nil, err
var maxCount float64
var minCount float64
// counts store the mapping from node name to so-far computed score of
// the node.
counts := make(map[string]float64, len(nodes))

processTerm := func(term *api.PodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, weight float64) error {
match, err := podMatchesNamespaceAndSelector(podToCheck, affinityPod, term)
if err != nil {
return err
}
if match {
for _, node := range nodes {
if ipa.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) {
counts[node.Name] += weight
}
totalCount += weightedCount
}
}

// count weights for the weighted pod anti-affinity
if affinity.PodAntiAffinity != nil {
for _, weightedTerm := range affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, (0 - weightedTerm.Weight), weightedTerm.PodAffinityTerm, node)
if err != nil {
return nil, err
}
totalCount += weightedCount
return nil
}
processTerms := func(terms []api.WeightedPodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, multiplier int) error {
for _, weightedTerm := range terms {
if err := processTerm(&weightedTerm.PodAffinityTerm, affinityPod, podToCheck, fixedNode, float64(weightedTerm.Weight*multiplier)); err != nil {
return err
}
}
return nil
}

// reverse direction checking: count weights for the inter-pod affinity/anti-affinity rules
// that are indicated by existing pods on the node.
for _, ep := range allPods {
epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations)
if err != nil {
for _, existingPod := range allPods {
existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return nil, err
}
existingPodAffinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
return nil, err
}

if affinity.PodAffinity != nil {
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return nil, err
}

if epAffinity.PodAffinity != nil {
// count the implicit weight for the hard pod affinity indicated by the existing pod.
if ipa.hardPodAffinityWeight > 0 {
var podAffinityTerms []api.PodAffinityTerm
if len(epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
podAffinityTerms = epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAffinityTerms = append(podAffinityTerms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, epAffinityTerm := range podAffinityTerms {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount += ipa.hardPodAffinityWeight
}
}
}

// count weight for the weighted pod affinity indicated by the existing pod.
for _, epWeightedTerm := range epAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount += epWeightedTerm.Weight
}
}
}
if affinity.PodAntiAffinity != nil {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return nil, err
}
}

// count weight for the weighted pod anti-affinity indicated by the existing pod.
if epAffinity.PodAntiAffinity != nil {
for _, epWeightedTerm := range epAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
if existingPodAffinity.PodAffinity != nil {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if ipa.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
if err := processTerm(&term, existingPod, pod, existingPodNode, float64(ipa.hardPodAffinityWeight)); err != nil {
return nil, err
}
if match {
totalCount -= epWeightedTerm.Weight
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return nil, err
}
}
if existingPodAffinity.PodAntiAffinity != nil {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return nil, err
}
}
}

counts[node.Name] = totalCount
for _, node := range nodes {
if counts[node.Name] > maxCount {
maxCount = counts[node.Name]
}
Expand All @@ -200,17 +190,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod
}

// calculate final priority score for each node
result := []schedulerapi.HostPriority{}
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for _, node := range nodes {
fScore := float64(0)
if (maxCount - minCount) > 0 {
fScore = 10 * (float64(counts[node.Name]-minCount) / float64(maxCount-minCount))
fScore = 10 * ((counts[node.Name] - minCount) / (maxCount - minCount))
}
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
glog.V(10).Infof(
"%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
}

return result, nil
}
83 changes: 0 additions & 83 deletions plugin/pkg/scheduler/algorithm/priorities/util/non_zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package util

import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)

// For each of these resources, a pod that doesn't request the resource explicitly
Expand Down Expand Up @@ -53,83 +50,3 @@ func GetNonzeroRequests(requests *api.ResourceList) (int64, int64) {
}
return outMilliCPU, outMemory
}

// FilterPodsByNameSpaces filters the pods based the given list of namespaces,
// empty set of namespaces means all namespaces.
func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod {
if len(pods) == 0 || len(names) == 0 {
return pods
}
result := []*api.Pod{}
for _, pod := range pods {
if names.Has(pod.Namespace) {
result = append(result, pod)
}
}
return result
}

// GetNamespacesFromPodAffinityTerm returns a set of names
// according to the namespaces indicated in podAffinityTerm.
// if the NameSpaces is nil considers the given pod's namespace
// if the Namespaces is empty list then considers all the namespaces
func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String {
names := sets.String{}
if podAffinityTerm.Namespaces == nil {
names.Insert(pod.Namespace)
} else if len(podAffinityTerm.Namespaces) != 0 {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}

// NodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key.
func NodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool {
return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey]
}

type Topologies struct {
DefaultKeys []string
}

// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value.
func (tps *Topologies) NodesHaveSameTopologyKey(nodeA *api.Node, nodeB *api.Node, topologyKey string) bool {
if len(topologyKey) == 0 {
// assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation)
for _, defaultKey := range tps.DefaultKeys {
if NodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) {
return true
}
}
return false
} else {
return NodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey)
}
}

type getNodeFunc func(*api.Pod) (*api.Node, error)

// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA
func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm, getNodeA, getNodeB getNodeFunc) (bool, error) {
names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm)
if len(names) != 0 && !names.Has(podA.Namespace) {
return false, nil
}

labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector)
if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) {
return false, err
}

podANode, err := getNodeA(podA)
if err != nil {
return false, err
}
podBNode, err := getNodeB(podB)
if err != nil {
return false, err
}

return tps.NodesHaveSameTopologyKey(podANode, podBNode, podBAffinityTerm.TopologyKey), nil
}