Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #94059: Track pods with required anti-affinity #94804

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/scheduler/framework/plugins/interpodaffinity/filtering.go
Expand Up @@ -163,25 +163,25 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount {
topoMaps := make([]topologyToMatchedTermCount, len(allNodes))
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
topoMaps := make([]topologyToMatchedTermCount, len(nodes))
index := int32(-1)
processNode := func(i int) {
nodeInfo := allNodes[i]
nodeInfo := nodes[i]
node := nodeInfo.Node()
if node == nil {
klog.Error("node not found")
return
}
topoMap := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.PodsWithAffinity {
for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1)
}
if len(topoMap) != 0 {
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
}
}
parallelize.Until(context.Background(), len(allNodes), processNode)
parallelize.Until(context.Background(), len(nodes), processNode)

result := make(topologyToMatchedTermCount)
for i := 0; i <= int(index); i++ {
Expand Down Expand Up @@ -241,12 +241,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al
// PreFilter invoked at the prefilter extension point.
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
var allNodes []*framework.NodeInfo
var havePodsWithAffinityNodes []*framework.NodeInfo
var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo
var err error
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos: %v", err))
}
if havePodsWithAffinityNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil {
if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err))
}

Expand All @@ -256,7 +256,7 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
}

// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods)

// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/v1alpha1/fake/listers.go
Expand Up @@ -245,6 +245,12 @@ func (nodes NodeInfoLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, e
return nodes, nil
}

// HavePodsWithRequiredAntiAffinityList is supposed to list nodes with at least one pod with
// required anti-affinity. For the fake lister we just return everything.
func (nodes NodeInfoLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
return nodes, nil
}

// NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes.
func NewNodeInfoLister(nodes []*v1.Node) framework.NodeInfoLister {
nodeInfoList := make([]*framework.NodeInfo, len(nodes))
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/v1alpha1/listers.go
Expand Up @@ -22,6 +22,8 @@ type NodeInfoLister interface {
List() ([]*NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with affinity terms.
HavePodsWithAffinityList() ([]*NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
// Returns the NodeInfo of the given node name.
Get(nodeName string) (*NodeInfo, error)
}
Expand Down
60 changes: 46 additions & 14 deletions pkg/scheduler/framework/v1alpha1/types.go
Expand Up @@ -196,6 +196,9 @@ type NodeInfo struct {
// The subset of pods with affinity.
PodsWithAffinity []*PodInfo

// The subset of pods with required anti-affinity.
PodsWithRequiredAntiAffinity []*PodInfo

// Ports allocated on the node.
UsedPorts HostPortInfo

Expand Down Expand Up @@ -457,6 +460,9 @@ func (n *NodeInfo) Clone() *NodeInfo {
if len(n.PodsWithAffinity) > 0 {
clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
}
if len(n.PodsWithRequiredAntiAffinity) > 0 {
clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
}
return clone
}

Expand Down Expand Up @@ -486,44 +492,67 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
n.NonZeroRequested.MilliCPU += non0CPU
n.NonZeroRequested.Memory += non0Mem
n.Pods = append(n.Pods, podInfo)
affinity := pod.Spec.Affinity
if affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) {
if podWithAffinity(pod) {
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
}
if podWithRequiredAntiAffinity(pod) {
n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
}

// Consume ports when pods added.
n.updateUsedPorts(podInfo.Pod, true)

n.Generation = nextGeneration()
}

// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
k1, err := GetPodKey(pod)
if err != nil {
return err
}
func podWithAffinity(p *v1.Pod) bool {
affinity := p.Spec.Affinity
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
}

func podWithRequiredAntiAffinity(p *v1.Pod) bool {
affinity := p.Spec.Affinity
return affinity != nil && affinity.PodAntiAffinity != nil &&
len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
}

for i := range n.PodsWithAffinity {
k2, err := GetPodKey(n.PodsWithAffinity[i].Pod)
func removeFromSlice(s []*PodInfo, k string) []*PodInfo {
for i := range s {
k2, err := GetPodKey(s[i].Pod)
if err != nil {
klog.Errorf("Cannot get pod key, err: %v", err)
continue
}
if k1 == k2 {
if k == k2 {
// delete the element
n.PodsWithAffinity[i] = n.PodsWithAffinity[len(n.PodsWithAffinity)-1]
n.PodsWithAffinity = n.PodsWithAffinity[:len(n.PodsWithAffinity)-1]
s[i] = s[len(s)-1]
s = s[:len(s)-1]
break
}
}
return s
}

// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
k, err := GetPodKey(pod)
if err != nil {
return err
}
if podWithAffinity(pod) {
n.PodsWithAffinity = removeFromSlice(n.PodsWithAffinity, k)
}
if podWithRequiredAntiAffinity(pod) {
n.PodsWithRequiredAntiAffinity = removeFromSlice(n.PodsWithRequiredAntiAffinity, k)
}

for i := range n.Pods {
k2, err := GetPodKey(n.Pods[i].Pod)
if err != nil {
klog.Errorf("Cannot get pod key, err: %v", err)
continue
}
if k1 == k2 {
if k == k2 {
// delete the element
n.Pods[i] = n.Pods[len(n.Pods)-1]
n.Pods = n.Pods[:len(n.Pods)-1]
Expand Down Expand Up @@ -558,6 +587,9 @@ func (n *NodeInfo) resetSlicesIfEmpty() {
if len(n.PodsWithAffinity) == 0 {
n.PodsWithAffinity = nil
}
if len(n.PodsWithRequiredAntiAffinity) == 0 {
n.PodsWithRequiredAntiAffinity = nil
}
if len(n.Pods) == 0 {
n.Pods = nil
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/scheduler/internal/cache/cache.go
Expand Up @@ -213,6 +213,10 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
updateNodesHavePodsWithAffinity := false
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false

// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
Expand All @@ -239,6 +243,9 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
updateNodesHavePodsWithAffinity = true
}
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone
Expand All @@ -254,7 +261,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
updateAllLists = true
}

if updateAllLists || updateNodesHavePodsWithAffinity {
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}

Expand All @@ -276,6 +283,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {

func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
Expand All @@ -287,6 +295,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
if len(n.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
}
if len(n.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
Expand All @@ -296,6 +307,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
if len(n.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
}
if len(n.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n)
}
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/scheduler/internal/cache/snapshot.go
Expand Up @@ -33,7 +33,10 @@ type Snapshot struct {
nodeInfoList []*framework.NodeInfo
// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
generation int64
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
// required anti-affinity terms.
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
generation int64
}

var _ framework.SharedLister = &Snapshot{}
Expand All @@ -50,17 +53,22 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
nodeInfoMap := createNodeInfoMap(pods, nodes)
nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
for _, v := range nodeInfoMap {
nodeInfoList = append(nodeInfoList, v)
if len(v.PodsWithAffinity) > 0 {
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
}
if len(v.PodsWithRequiredAntiAffinity) > 0 {
havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v)
}
}

s := NewEmptySnapshot()
s.nodeInfoMap = nodeInfoMap
s.nodeInfoList = nodeInfoList
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList

return s
}
Expand Down Expand Up @@ -137,11 +145,17 @@ func (s *Snapshot) List() ([]*framework.NodeInfo, error) {
return s.nodeInfoList, nil
}

// HavePodsWithAffinityList returns the list of nodes with at least one pods with inter-pod affinity
// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity
func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
return s.havePodsWithAffinityNodeInfoList, nil
}

// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with
// required inter-pod anti-affinity
func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil
}

// Get returns the NodeInfo of the given node name.
func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {
Expand Down