Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize pod affinity 2 #29379

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 185 additions & 100 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"github.com/golang/glog"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/labels"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand Down Expand Up @@ -67,20 +69,31 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {

// podMetadata is a type that is passed as metadata for predicate functions
type predicateMetadata struct {
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
}

func PredicateMetadata(pod *api.Pod) interface{} {
type matchingPodAntiAffinityTerm struct {
term *api.PodAffinityTerm
node *api.Node
}

func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
// We cannot compute metadata, just return nil
return nil
}
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeInfoMap)
if err != nil {
return nil
}
return &predicateMetadata{
podBestEffort: isPodBestEffort(pod),
podRequest: getResourceRequest(pod),
podPorts: getUsedPorts(pod),
podBestEffort: isPodBestEffort(pod),
podRequest: getResourceRequest(pod),
podPorts: getUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
}

Expand Down Expand Up @@ -793,60 +806,54 @@ func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister, failu
return checker.InterPodAffinityMatches
}

func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
}
allPods, err := checker.podLister.List(labels.Everything())
if err != nil {
return false, err
if !c.satisfiesExistingPodsAntiAffinity(pod, meta, node) {
return false, nil
}

// Now check if <pod> requirements will be satisfied on this node.
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return false, err
}

// Check if the current node match the inter-pod affinity scheduling constraints.
// Hard inter-pod affinity is not symmetric, check only when affinity.PodAffinity exists.
if affinity != nil && affinity.PodAffinity != nil {
if !checker.NodeMatchesHardPodAffinity(pod, allPods, node, affinity.PodAffinity) {
return false, ErrPodAffinityNotMatch
}
}

// Hard inter-pod anti-affinity is symmetric, we should always check it
// (also when affinity or affinity.PodAntiAffinity is nil).
var antiAffinity *api.PodAntiAffinity
if affinity != nil {
antiAffinity = affinity.PodAntiAffinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil
}
if !checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, antiAffinity) {
return false, ErrPodAffinityNotMatch
if !c.satisfiesPodsAffinityAntiAffinity(pod, node, affinity) {
return false, nil
}

if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",
podName(pod), node.Name)
}
return true, nil
}

// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
// First return value indicates whether a matching pod exists on a node that matches the topology key,
// while the second return value indicates whether a matching pod exists anywhere.
// TODO: Do we really need any pod matching, or all pods matching? I think the latter.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, bool, error) {
func (c *PodAffinityChecker) anyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, term *api.PodAffinityTerm) (bool, bool, error) {
matchingPodExists := false
for _, ep := range allPods {
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
for _, existingPod := range allPods {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, pod, term)
if err != nil {
return false, matchingPodExists, err
}
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(ep, pod, &podAffinityTerm)
if err != nil {
return false, matchingPodExists, err
}

if match {
matchingPodExists = true
if checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, podAffinityTerm.TopologyKey) {
existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return false, matchingPodExists, err
}
if c.failureDomains.NodesHaveSameTopologyKey(node, existingPodNode, term.TopologyKey) {
return true, matchingPodExists, nil
}
}
Expand Down Expand Up @@ -880,87 +887,167 @@ func getPodAntiAffinityTerms(podAntiAffinity *api.PodAntiAffinity) (terms []api.
return terms
}

// Checks whether the given node has pods which satisfy all the required pod affinity scheduling rules.
// If node has pods which satisfy all the required pod affinity scheduling rules then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinity *api.PodAffinity) bool {
for _, podAffinityTerm := range getPodAffinityTerms(podAffinity) {
podAffinityTermMatches, matchingPodExists, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAffinityTerm)
if err != nil {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, an error ocurred when checking existing pods on the node for PodAffinityTerm %v err: %v",
podName(pod), node.Name, podAffinityTerm, err)
return false
func getMatchingAntiAffinityTerms(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) ([]matchingPodAntiAffinityTerm, error) {
allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}

var lock sync.Mutex
var result []matchingPodAntiAffinityTerm
var firstError error
appendResult := func(toAppend []matchingPodAntiAffinityTerm) {
lock.Lock()
defer lock.Unlock()
result = append(result, toAppend...)
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}

if !podAffinityTermMatches {
// If the requiredDuringScheduling affinity requirement matches a pod's own labels and namespace, and there are no other such pods
// anywhere, then disregard the requirement.
// This allows rules like "schedule all of the pods of this collection to the same zone" to not block forever
// because the first pod of the collection can't be scheduled.
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, pod, &podAffinityTerm)
if err != nil || !match || matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because none of the existing pods on this node satisfy the PodAffinityTerm %v, err: %+v",
podName(pod), node.Name, podAffinityTerm, err)
return false
processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]]
node := nodeInfo.Node()
if node == nil {
catchError(fmt.Errorf("node not found"))
return
}
var nodeResult []matchingPodAntiAffinityTerm
for _, existingPod := range nodeInfo.PodsWithAffinity() {
affinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
catchError(err)
return
}
if affinity == nil {
continue
}
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, existingPod, &term)
if err != nil {
catchError(err)
return
}
if match {
nodeResult = append(nodeResult, matchingPodAntiAffinityTerm{term: &term, node: node})
}
}
}
if len(nodeResult) > 0 {
appendResult(nodeResult)
}
}
// all the required pod affinity scheduling rules satisfied
glog.V(10).Infof("All the required pod affinity scheduling rules are satisfied for Pod %+v, on node %v", podName(pod), node.Name)
return true
workqueue.Parallelize(16, len(allNodeNames), processNode)
return result, firstError
}

// Checks whether the given node has pods which satisfy all the
// required pod anti-affinity scheduling rules.
// Also checks whether putting the pod onto the node would break
// any anti-affinity scheduling rules indicated by existing pods.
// If node has pods which satisfy all the required pod anti-affinity
// scheduling rules and scheduling the pod onto the node won't
// break any existing pods' anti-affinity rules, then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAntiAffinity *api.PodAntiAffinity) bool {
// foreach element podAntiAffinityTerm of podAntiAffinityTerms
// if the pod matches the term (breaks the anti-affinity),
// don't schedule the pod onto this node.
for _, podAntiAffinityTerm := range getPodAntiAffinityTerms(podAntiAffinity) {
podAntiAffinityTermMatches, _, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAntiAffinityTerm)
if err != nil || podAntiAffinityTermMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because not all the existing pods on this node satisfy the PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, podAntiAffinityTerm, err)
return false
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *api.Pod, allPods []*api.Pod) ([]matchingPodAntiAffinityTerm, error) {
var result []matchingPodAntiAffinityTerm
for _, existingPod := range allPods {
affinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
return nil, err
}
if affinity.PodAntiAffinity != nil {
existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return nil, err
}
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, existingPod, &term)
if err != nil {
return nil, err
}
if match {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: existingPodNode})
}
}
}
}
return result, nil
}

// Check if scheduling the pod onto this node would break
// any anti-affinity rules indicated by the existing pods on the node.
// If it would break, system should not schedule pod onto this node.
for _, ep := range allPods {
epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations)
// Checks if scheduling the pod onto this node would break any anti-affinity
// rules indicated by the existing pods.
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *api.Pod, meta interface{}, node *api.Node) bool {
var matchingTerms []matchingPodAntiAffinityTerm
if predicateMeta, ok := meta.(*predicateMetadata); ok {
matchingTerms = predicateMeta.matchingAntiAffinityTerms
} else {
allPods, err := c.podLister.List(labels.Everything())
if err != nil {
glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err)
glog.V(10).Infof("Failed to get all pods, %+v", err)
return false
}
if epAffinity == nil {
continue
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, allPods); err != nil {
glog.V(10).Infof("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
return false
}
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
}
for _, term := range matchingTerms {
if c.failureDomains.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return false
}
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("Schedule Pod %+v on Node %+v is allowed, existing pods anti-affinity rules satisfied.",
podName(pod), node.Name)
}
return true
}

// Checks if scheduling the pod onto this node would break any rules of this pod.
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *api.Pod, node *api.Node, affinity *api.Affinity) bool {
allPods, err := c.podLister.List(labels.Everything())
if err != nil {
return false
}

// Check all affinity terms.
for _, term := range getPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
if err != nil {
glog.V(10).Infof("Failed to get node from Pod %+v, err: %+v", podName(ep), err)
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return false
}
for _, epAntiAffinityTerm := range getPodAntiAffinityTerms(epAffinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, ep, &epAntiAffinityTerm)
if err != nil {
glog.V(10).Infof("Failed to get label selector from anti-affinityterm %+v of existing pod %+v, err: %+v", epAntiAffinityTerm, podName(pod), err)
return false
}
if match && checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, epAntiAffinityTerm.TopologyKey) {
glog.V(10).Infof("Cannot schedule Pod %+v, onto node %v because the pod would break the PodAntiAffinityTerm %+v, of existing pod %+v, err: %v",
podName(pod), node.Name, epAntiAffinityTerm, podName(ep), err)
if !termMatches {
// If the requirement matches a pod's own labels ane namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, pod, &term)
if err != nil || !match || matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return false
}
}
}
// all the required pod anti-affinity scheduling rules are satisfied
glog.V(10).Infof("Can schedule Pod %+v, on node %v because all the required pod anti-affinity scheduling rules are satisfied", podName(pod), node.Name)

// Check all anti-affinity terms.
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return false
}
}

if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("Schedule Pod %+v on Node %+v is allowed, pod afinnity/anti-affinity constraints satisfied.",
podName(pod), node.Name)
}
return true
}

Expand Down Expand Up @@ -1026,9 +1113,7 @@ func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *
}

var podBestEffort bool

predicateMeta, ok := meta.(*predicateMetadata)
if ok {
if predicateMeta, ok := meta.(*predicateMetadata); ok {
podBestEffort = predicateMeta.podBestEffort
} else {
// We couldn't parse metadata - fallback to computing it.
Expand Down
Loading