Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ErrorChannel to communicate errors during parallel execution in interpod_afiinity #80588

Merged
merged 1 commit into from Jul 30, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 32 additions & 28 deletions pkg/scheduler/algorithm/priorities/interpod_affinity.go
Expand Up @@ -18,7 +18,6 @@ package priorities

import (
"context"
"sync"
"sync/atomic"

"k8s.io/api/core/v1"
Expand All @@ -30,6 +29,7 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"

"k8s.io/klog"
)
Expand Down Expand Up @@ -58,15 +58,11 @@ func NewInterPodAffinityPriority(
}

type podAffinityPriorityMap struct {
sync.Mutex

// nodes contain all nodes that should be considered
nodes []*v1.Node
// counts store the mapping from node name to so-far computed score of
// the node.
counts map[string]*int64
// The first error that we faced.
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
firstError error
}

func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
Expand All @@ -76,20 +72,11 @@ func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
}
}

func (p *podAffinityPriorityMap) setError(err error) {
p.Lock()
defer p.Unlock()
if p.firstError == nil {
p.firstError = err
}
}

func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) {
func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
p.setError(err)
return
return err
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector)
if match {
Expand All @@ -99,13 +86,17 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini
}
}
}
return nil
}

func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for i := range terms {
term := &terms[i]
p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier)))
if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil {
return err
}
}
return nil
}

// CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
Expand Down Expand Up @@ -152,14 +143,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, 1)
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return err
}
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
}
if hasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, -1)
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return err
}
}

if existingHasAffinityConstraints {
Expand All @@ -173,48 +168,57 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight))
if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, existingPod, pod, existingPodNode, 1)
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, existingPod, pod, existingPodNode, -1)
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return err
}
}
return nil
}

errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]]
if nodeInfo.Node() != nil {
if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the pods.
for _, existingPod := range nodeInfo.Pods() {
if err := processPod(existingPod); err != nil {
pm.setError(err)
errCh.SendErrorWithCancel(err, cancel)
}
}
} else {
// The pod doesn't have any constraints - we need to check only existing
// ones that have some.
for _, existingPod := range nodeInfo.PodsWithAffinity() {
if err := processPod(existingPod); err != nil {
pm.setError(err)
errCh.SendErrorWithCancel(err, cancel)
}
}
}
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
if pm.firstError != nil {
return nil, pm.firstError
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}

for _, node := range nodes {
Expand Down