Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More pod-affinity code cleanup and prepare for parallelization #29109

Merged
merged 1 commit into from
Jul 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 82 additions & 96 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,73 +802,101 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in
if err != nil {
return false, err
}
if checker.NodeMatchPodAffinityAntiAffinity(pod, allPods, node) {
return true, nil
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return false, err
}

// Check if the current node match the inter-pod affinity scheduling constraints.
// Hard inter-pod affinity is not symmetric, check only when affinity.PodAffinity exists.
if affinity.PodAffinity != nil {
if !checker.NodeMatchesHardPodAffinity(pod, allPods, node, affinity.PodAffinity) {
return false, ErrPodAffinityNotMatch
}
}

// Hard inter-pod anti-affinity is symmetric, we should always check it.
if !checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, affinity.PodAntiAffinity) {
return false, ErrPodAffinityNotMatch
}
return false, ErrPodAffinityNotMatch

return true, nil
}

// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) {
// First return value indicates whether a matching pod exists on a node that matches the topology key,
// while the second return value indicates whether a matching pod exists anywhere.
// TODO: Do we really need any pod matching, or all pods matching? I think the latter.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, bool, error) {
matchingPodExists := false
for _, ep := range allPods {
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil {
return false, err
return false, matchingPodExists, err
}
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, epNode, node, pod, podAffinityTerm)
if err != nil || match {
return match, err
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(ep, pod, &podAffinityTerm)
if err != nil {
return false, matchingPodExists, err
}

if match {
matchingPodExists = true
if checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, podAffinityTerm.TopologyKey) {
return true, matchingPodExists, nil
}
}
}
return false, matchingPodExists, nil
}

func getPodAffinityTerms(podAffinity *api.PodAffinity) (terms []api.PodAffinityTerm) {
if podAffinity != nil {
if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return false, nil
return terms
}

func getPodAntiAffinityTerms(podAntiAffinity *api.PodAntiAffinity) (terms []api.PodAffinityTerm) {
if podAntiAffinity != nil {
if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}

// Checks whether the given node has pods which satisfy all the required pod affinity scheduling rules.
// If node has pods which satisfy all the required pod affinity scheduling rules then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinity *api.PodAffinity) bool {
var podAffinityTerms []api.PodAffinityTerm
if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
podAffinityTerms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAffinityTerms = append(podAffinityTerms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}

for _, podAffinityTerm := range podAffinityTerms {
podAffinityTermMatches, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAffinityTerm)
for _, podAffinityTerm := range getPodAffinityTerms(podAffinity) {
podAffinityTermMatches, matchingPodExists, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAffinityTerm)
if err != nil {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, an error ocurred when checking existing pods on the node for PodAffinityTerm %v err: %v",
podName(pod), node.Name, podAffinityTerm, err)
return false
}

if !podAffinityTermMatches {
// TODO: Think about whether this can be simplified once we have controllerRef
// Check if it is in special case that the requiredDuringScheduling affinity requirement can be disregarded.
// If the requiredDuringScheduling affinity requirement matches a pod's own labels and namespace, and there are no other such pods
// anywhere, then disregard the requirement.
// This allows rules like "schedule all of the pods of this collection to the same zone" to not block forever
// because the first pod of the collection can't be scheduled.
names := priorityutil.GetNamespacesFromPodAffinityTerm(pod, podAffinityTerm)
labelSelector, err := unversioned.LabelSelectorAsSelector(podAffinityTerm.LabelSelector)
if err != nil || !names.Has(pod.Namespace) || !labelSelector.Matches(labels.Set(pod.Labels)) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, pod, &podAffinityTerm)
if err != nil || !match || matchingPodExists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm 99% sure this should be !matchingPodExists -- I'm not sure how the tests are passing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it should be "matchingPodExists" as it is.
Let me clarify. So basically, if there is no existing matching pod, that means that we are the first pod. So we need to schedule it somewhere, even though any existing pod is not satisfying its affinity - because there are no such pods satisfying its selector.

However, if some matchingPodExists, then there has to be some that satisfies its conditions, so if there is a pod that satisifies rules, but non of these are satisfying topology key, then it is bad.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I got confused, you're right.

glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because none of the existing pods on this node satisfy the PodAffinityTerm %v, err: %+v",
podName(pod), node.Name, podAffinityTerm, err)
return false
}

// the affinity is to put the pod together with other pods from its same service or controller
filteredPods := priorityutil.FilterPodsByNameSpaces(names, allPods)
for _, filteredPod := range filteredPods {
// if found an existing pod from same service or RC,
// the affinity scheduling rules cannot be disregarded.
if labelSelector.Matches(labels.Set(filteredPod.Labels)) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because none of the existing pods on this node satisfy the PodAffinityTerm %v",
podName(pod), node.Name, podAffinityTerm)
return false
}
}
}
}
// all the required pod affinity scheduling rules satisfied
Expand All @@ -884,21 +912,12 @@ func (checker *PodAffinityChecker) NodeMatchesHardPodAffinity(pod *api.Pod, allP
// scheduling rules and scheduling the pod onto the node won't
// break any existing pods' anti-affinity rules, then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAntiAffinity *api.PodAntiAffinity) bool {
var podAntiAffinityTerms []api.PodAffinityTerm
if podAntiAffinity != nil && len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
podAntiAffinityTerms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAntiAffinityTerms = append(podAntiAffinityTerms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}

// foreach element podAntiAffinityTerm of podAntiAffinityTerms
// if the pod matches the term (breaks the anti-affinity),
// don't schedule the pod onto this node.
for _, podAntiAffinityTerm := range podAntiAffinityTerms {
podAntiAffinityTermMatches, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAntiAffinityTerm)
if err != nil || podAntiAffinityTermMatches == true {
for _, podAntiAffinityTerm := range getPodAntiAffinityTerms(podAntiAffinity) {
podAntiAffinityTermMatches, _, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAntiAffinityTerm)
if err != nil || podAntiAffinityTermMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because not all the existing pods on this node satisfy the PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, podAntiAffinityTerm, err)
return false
Expand All @@ -914,32 +933,21 @@ func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod,
glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err)
return false
}
if epAffinity.PodAntiAffinity != nil {
var epAntiAffinityTerms []api.PodAffinityTerm
if len(epAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
epAntiAffinityTerms = epAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil {
glog.V(10).Infof("Failed to get node from Pod %+v, err: %+v", podName(ep), err)
return false
}
for _, epAntiAffinityTerm := range getPodAntiAffinityTerms(epAffinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, ep, &epAntiAffinityTerm)
if err != nil {
glog.V(10).Infof("Failed to get label selector from anti-affinityterm %+v of existing pod %+v, err: %+v", epAntiAffinityTerm, podName(pod), err)
return false
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(epAffinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// epAntiAffinityTerms = append(epAntiAffinityTerms, epAffinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}

for _, epAntiAffinityTerm := range epAntiAffinityTerms {
labelSelector, err := unversioned.LabelSelectorAsSelector(epAntiAffinityTerm.LabelSelector)
if err != nil {
glog.V(10).Infof("Failed to get label selector from anti-affinityterm %+v of existing pod %+v, err: %+v", epAntiAffinityTerm, podName(pod), err)
return false
}

names := priorityutil.GetNamespacesFromPodAffinityTerm(ep, epAntiAffinityTerm)
if (len(names) == 0 || names.Has(pod.Namespace)) && labelSelector.Matches(labels.Set(pod.Labels)) {
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil || checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, epAntiAffinityTerm.TopologyKey) {
glog.V(10).Infof("Cannot schedule Pod %+v, onto node %v because the pod would break the PodAntiAffinityTerm %+v, of existing pod %+v, err: %v",
podName(pod), node.Name, epAntiAffinityTerm, podName(ep), err)
return false
}
}
if match && checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, epAntiAffinityTerm.TopologyKey) {
glog.V(10).Infof("Cannot schedule Pod %+v, onto node %v because the pod would break the PodAntiAffinityTerm %+v, of existing pod %+v, err: %v",
podName(pod), node.Name, epAntiAffinityTerm, podName(ep), err)
return false
}
}
}
Expand All @@ -948,28 +956,6 @@ func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod,
return true
}

// NodeMatchPodAffinityAntiAffinity checks if the node matches
// the requiredDuringScheduling affinity/anti-affinity rules indicated by the pod.
func (checker *PodAffinityChecker) NodeMatchPodAffinityAntiAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node) bool {
// Parse required affinity scheduling rules.
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err)
return false
}

// check if the current node match the inter-pod affinity scheduling rules.
// hard inter-pod affinity is not symmetric, check only when affinity.PodAffinity is not nil.
if affinity.PodAffinity != nil {
if !checker.NodeMatchesHardPodAffinity(pod, allPods, node, affinity.PodAffinity) {
return false
}
}

// hard inter-pod anti-affinity is symmetric, check both when affinity.PodAntiAffinity is nil and not nil.
return checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, affinity.PodAntiAffinity)
}

func PodToleratesNodeTaints(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
Expand Down
Loading