Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Priorities use SharedLister interface instead of NodeInfo Map #84449

Merged
merged 1 commit into from Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/scheduler/BUILD
Expand Up @@ -86,6 +86,7 @@ go_test(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand Down
Expand Up @@ -27,7 +27,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)

// getExistingVolumeCountForNode gets the current number of volumes on node.
Expand Down Expand Up @@ -401,17 +401,17 @@ func TestBalancedResourceAllocation(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
if len(test.pod.Spec.Volumes) > 0 {
maxVolumes := 5
for _, info := range nodeNameToInfo {
for _, info := range snapshot.NodeInfoMap {
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes)
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
}
}
function := priorityFunction(BalancedResourceAllocationMap, nil, nil)

list, err := function(test.pod, nodeNameToInfo, test.nodes)
list, err := function(test.pod, snapshot, test.nodes)

if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
18 changes: 9 additions & 9 deletions pkg/scheduler/algorithm/priorities/even_pods_spread.go
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"

"k8s.io/klog"
Expand Down Expand Up @@ -82,26 +82,26 @@ func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node)
// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod.
// Whether existingPod matches incomingPod doesn't contribute to the final score.
// This is different from the Affinity API.
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := make(framework.NodeScoreList, len(nodes))
// return if incoming pod doesn't have soft topology spread constraints.
constraints := getSoftTopologySpreadConstraints(pod)
if len(constraints) == 0 {
return result, nil
}

allNodes, err := sharedLister.NodeInfos().List()
if err != nil {
return nil, err
}

t := newTopologySpreadConstraintsMap()
t.initialize(pod, nodes)

allNodeNames := make([]string, 0, len(nodeNameToInfo))
for name := range nodeNameToInfo {
allNodeNames = append(allNodeNames, name)
}

errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processAllNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]]
nodeInfo := allNodes[i]
node := nodeInfo.Node()
if node == nil {
return
Expand Down Expand Up @@ -136,7 +136,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processAllNode)
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/scheduler/algorithm/priorities/even_pods_spread_test.go
Expand Up @@ -22,7 +22,7 @@ import (

v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)

Expand Down Expand Up @@ -434,9 +434,8 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, tt.nodes...)
allNodes = append(allNodes, tt.failedNodes...)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, allNodes)

got, _ := CalculateEvenPodsSpreadPriority(tt.pod, nodeNameToInfo, tt.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, allNodes)
got, _ := CalculateEvenPodsSpreadPriority(tt.pod, snapshot, tt.nodes)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("CalculateEvenPodsSpreadPriority() = %#v, want %#v", got, tt.want)
}
Expand Down Expand Up @@ -484,10 +483,10 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
b.ResetTimer()
for i := 0; i < b.N; i++ {
CalculateEvenPodsSpreadPriority(tt.pod, nodeNameToInfo, filteredNodes)
CalculateEvenPodsSpreadPriority(tt.pod, snapshot, filteredNodes)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/algorithm/priorities/image_locality_test.go
Expand Up @@ -25,7 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/util/parsers"
)

Expand Down Expand Up @@ -184,8 +184,8 @@ func TestImageLocalityPriority(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/scheduler/algorithm/priorities/interpod_affinity.go
Expand Up @@ -26,22 +26,19 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"

"k8s.io/klog"
)

// InterPodAffinity contains information to calculate inter pod affinity.
type InterPodAffinity struct {
nodeInfoLister schedulerlisters.NodeInfoLister
hardPodAffinityWeight int32
}

// NewInterPodAffinityPriority creates an InterPodAffinity.
func NewInterPodAffinityPriority(nodeInfoLister schedulerlisters.NodeInfoLister, hardPodAffinityWeight int32) PriorityFunction {
func NewInterPodAffinityPriority(hardPodAffinityWeight int32) PriorityFunction {
interPodAffinity := &InterPodAffinity{
nodeInfoLister: nodeInfoLister,
hardPodAffinityWeight: hardPodAffinityWeight,
}
return interPodAffinity.CalculateInterPodAffinityPriority
Expand Down Expand Up @@ -102,14 +99,14 @@ func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm
// that node; the node(s) with the highest sum are the most preferred.
// Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity,
// symmetry need to be considered for hard requirements from podAffinity
func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
affinity := pod.Spec.Affinity
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil

// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
pm := newPodAffinityPriorityMap(nodes)
allNodes, err := ipa.nodeInfoLister.List()
allNodes, err := sharedLister.NodeInfos().List()
if err != nil {
return nil, err
}
Expand All @@ -118,7 +115,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
var maxCount, minCount int64

processPod := func(existingPod *v1.Pod) error {
existingPodNodeInfo, err := ipa.nodeInfoLister.Get(existingPod.Spec.NodeName)
existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
return nil
Expand Down
9 changes: 3 additions & 6 deletions pkg/scheduler/algorithm/priorities/interpod_affinity_test.go
Expand Up @@ -516,10 +516,9 @@ func TestInterPodAffinityPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
interPodAffinity := InterPodAffinity{
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes)
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -604,10 +603,9 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
ipa := InterPodAffinity{
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: test.hardPodAffinityWeight,
}
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes)
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -661,12 +659,11 @@ func BenchmarkInterPodAffinityPriority(b *testing.B) {
existingPods, allNodes := tt.prepFunc(tt.existingPodsNum, tt.allNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
interPodAffinity := InterPodAffinity{
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot.NodeInfoMap, allNodes)
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot, allNodes)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/algorithm/priorities/least_requested_test.go
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)

func TestLeastRequested(t *testing.T) {
Expand Down Expand Up @@ -253,8 +253,8 @@ func TestLeastRequested(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/scheduler/algorithm/priorities/metadata.go
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)

Expand Down Expand Up @@ -61,19 +62,25 @@ type priorityMetadata struct {
}

// PriorityMetadata is a PriorityMetadataProducer. Node info can be nil.
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) interface{} {
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
totalNumNodes := 0
if sharedLister != nil {
if l, err := sharedLister.NodeInfos().List(); err == nil {
totalNumNodes = len(l)
}
}
return &priorityMetadata{
podLimits: getResourceLimits(pod),
podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
affinity: pod.Spec.Affinity,
podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
controllerRef: metav1.GetControllerOf(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: len(nodeNameToInfo),
totalNumNodes: totalNumNodes,
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/algorithm/priorities/most_requested_test.go
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)

func TestMostRequested(t *testing.T) {
Expand Down Expand Up @@ -210,8 +210,8 @@ func TestMostRequested(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/algorithm/priorities/node_affinity_test.go
Expand Up @@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)

func TestNodeAffinityPriority(t *testing.T) {
Expand Down Expand Up @@ -167,9 +167,9 @@ func TestNodeAffinityPriority(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil)
list, err := nap(test.pod, nodeNameToInfo, test.nodes)
list, err := nap(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/algorithm/priorities/node_label_test.go
Expand Up @@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)

func TestNewNodeLabelPriority(t *testing.T) {
Expand Down Expand Up @@ -107,12 +107,12 @@ func TestNewNodeLabelPriority(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
labelPrioritizer := &NodeLabelPrioritizer{
label: test.label,
presence: test.presence,
}
list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, nodeNameToInfo, test.nodes)
list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
Expand Up @@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)

func TestNodePreferAvoidPriority(t *testing.T) {
Expand Down Expand Up @@ -141,8 +141,8 @@ func TestNodePreferAvoidPriority(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/algorithm/priorities/reduce.go
Expand Up @@ -19,7 +19,7 @@ package priorities
import (
"k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
)

// NormalizeReduce generates a PriorityReduceFunction that can normalize the result
Expand All @@ -29,7 +29,7 @@ func NormalizeReduce(maxPriority int64, reverse bool) PriorityReduceFunction {
return func(
_ *v1.Pod,
_ interface{},
_ map[string]*schedulernodeinfo.NodeInfo,
_ schedulerlisters.SharedLister,
result framework.NodeScoreList) error {

var maxCount int64
Expand Down