Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and optimize preferred (anti) pod affinity #85959

Merged
merged 1 commit into from Dec 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/scheduler/algorithm/priorities/BUILD
Expand Up @@ -45,6 +45,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
Expand Down
238 changes: 140 additions & 98 deletions pkg/scheduler/algorithm/priorities/interpod_affinity.go
Expand Up @@ -23,6 +23,8 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
Expand All @@ -36,47 +38,67 @@ import (
type topologyPairToScore map[string]map[string]int64

type podAffinityPriorityMap struct {
// nodes contain all nodes that should be considered.
nodes []*v1.Node
// tracks a topology pair score so far.
topologyScore topologyPairToScore
topologyScore topologyPairToScore
affinityTerms []*weightedAffinityTerm
antiAffinityTerms []*weightedAffinityTerm
hardPodAffinityWeight int32
sync.Mutex
}

func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
return &podAffinityPriorityMap{
nodes: nodes,
topologyScore: make(topologyPairToScore),
}
// A "processed" representation of v1.WeightedAffinityTerm.
type weightedAffinityTerm struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth adding a comment indicating that this the representation of v1.WeightedPodAffinityTerm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

namespaces sets.String
selector labels.Selector
weight int32
topologyKey string
}

func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term)
func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return err
return nil, err
}
return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil
}

func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
if terms == nil {
return nil, nil
}

var processedTerms []*weightedAffinityTerm
for i := range terms {
p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight)
if err != nil {
return nil, err
}
processedTerms = append(processedTerms, p)
}
return processedTerms, nil
}

func (p *podAffinityPriorityMap) processTerm(term *weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
if len(fixedNode.Labels) == 0 {
return nil
}

match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector)
tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey]
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
if match && tpValueExist {
p.Lock()
if p.topologyScore[term.TopologyKey] == nil {
p.topologyScore[term.TopologyKey] = make(map[string]int64)
if p.topologyScore[term.topologyKey] == nil {
p.topologyScore[term.topologyKey] = make(map[string]int64)
}
p.topologyScore[term.TopologyKey][tpValue] += weight
p.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
p.Unlock()
}
return nil
}

func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for i := range terms {
term := &terms[i]
if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil {
func (p *podAffinityPriorityMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for _, term := range terms {
if err := p.processTerm(term, podToCheck, fixedNode, multiplier); err != nil {
return err
}
}
Expand Down Expand Up @@ -143,6 +165,75 @@ func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, shar
return nil
}

func (p *podAffinityPriorityMap) processExistingPod(existingPod *v1.Pod, existingPodNodeInfo *schedulernodeinfo.NodeInfo, incomingPod *v1.Pod) error {
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
existingPodNode := existingPodNodeInfo.Node()

// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
if err := p.processTerms(p.affinityTerms, existingPod, existingPodNode, 1); err != nil {
return err
}

// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
if err := p.processTerms(p.antiAffinityTerms, existingPod, existingPodNode, -1); err != nil {
return err
}

if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if p.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for i := range terms {
term := &terms[i]
processedTerm, err := newWeightedAffinityTerm(existingPod, term, p.hardPodAffinityWeight)
if err != nil {
return err
}
if err := p.processTerm(processedTerm, incomingPod, existingPodNode, 1); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
klog.Error(err)
return nil
}

if err := p.processTerms(terms, incomingPod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
return err
}
if err := p.processTerms(terms, incomingPod, existingPodNode, -1); err != nil {
return err
}
}
return nil
}

func buildTopologyPairToScore(
pod *v1.Pod,
sharedLister schedulerlisters.SharedLister,
Expand All @@ -158,9 +249,8 @@ func buildTopologyPairToScore(
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil

// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
pm := newPodAffinityPriorityMap(filteredNodes)

// Unless the pod being scheduled has affinity terms, we only
// need to process nodes hosting pods with affinity.
allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
klog.Errorf("get pods with affinity list error, err: %v", err)
Expand All @@ -174,93 +264,45 @@ func buildTopologyPairToScore(
}
}

processPod := func(existingPod *v1.Pod) error {
existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
var affinityTerms []*weightedAffinityTerm
var antiAffinityTerms []*weightedAffinityTerm
if hasAffinityConstraints {
if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
existingPodNode := existingPodNodeInfo.Node()

if hasAffinityConstraints {
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return err
}
}
if hasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return err
}
}
if hasAntiAffinityConstraints {
if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}

if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(hardPodAffinityWeight)); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return err
}
}
return nil
pm := podAffinityPriorityMap{
topologyScore: make(topologyPairToScore),
affinityTerms: affinityTerms,
antiAffinityTerms: antiAffinityTerms,
hardPodAffinityWeight: hardPodAffinityWeight,
}

errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) {
nodeInfo := allNodes[i]
if nodeInfo.Node() != nil {
// Unless the pod being scheduled has affinity terms, we only
// need to process pods with affinity in the node.
podsToProcess := nodeInfo.PodsWithAffinity()
if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the pods.
for _, existingPod := range nodeInfo.Pods() {
if err := processPod(existingPod); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
}
} else {
// The pod doesn't have any constraints - we need to check only existing
// ones that have some.
for _, existingPod := range nodeInfo.PodsWithAffinity() {
if err := processPod(existingPod); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
podsToProcess = nodeInfo.Pods()
}

for _, existingPod := range podsToProcess {
if err := pm.processExistingPod(existingPod, nodeInfo, pod); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
}
}
Expand Down