Skip to content

Commit

Permalink
Merge pull request #81068 from Huang-Wei/eps-structure-optimize
Browse files Browse the repository at this point in the history
scheduler: internal data structure optimization
  • Loading branch information
k8s-ci-robot committed Aug 21, 2019
2 parents d935e06 + 8f559ea commit e7ecb22
Show file tree
Hide file tree
Showing 8 changed files with 580 additions and 652 deletions.
271 changes: 125 additions & 146 deletions pkg/scheduler/algorithm/predicates/metadata.go

Large diffs are not rendered by default.

780 changes: 350 additions & 430 deletions pkg/scheduler/algorithm/predicates/metadata_test.go

Large diffs are not rendered by default.

23 changes: 11 additions & 12 deletions pkg/scheduler/algorithm/predicates/predicates.go
Expand Up @@ -1796,15 +1796,15 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *sche
return true, nil, nil
}

var topologyPairsPodSpreadMap *topologyPairsPodSpreadMap
var podSpreadCache *podSpreadCache
if predicateMeta, ok := meta.(*predicateMetadata); ok {
topologyPairsPodSpreadMap = predicateMeta.topologyPairsPodSpreadMap
podSpreadCache = predicateMeta.podSpreadCache
} else { // We don't have precomputed metadata. We have to follow a slow path to check spread constraints.
// TODO(Huang-Wei): get it implemented
// TODO(autoscaler): get it implemented
return false, nil, errors.New("metadata not pre-computed for EvenPodsSpreadPredicate")
}

if topologyPairsPodSpreadMap == nil || len(topologyPairsPodSpreadMap.topologyKeyToMinPodsMap) == 0 {
if podSpreadCache == nil || len(podSpreadCache.tpPairToMatchNum) == 0 {
return true, nil, nil
}

Expand All @@ -1821,25 +1821,24 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *sche
if err != nil {
return false, nil, err
}
selfMatchNum := 0
selfMatchNum := int32(0)
if selfMatch {
selfMatchNum = 1
}

pair := topologyPair{key: tpKey, value: tpVal}
minMatchNum, ok := topologyPairsPodSpreadMap.topologyKeyToMinPodsMap[tpKey]
paths, ok := podSpreadCache.tpKeyToCriticalPaths[tpKey]
if !ok {
// error which should not happen
klog.Errorf("internal error: get minMatchNum from key %q of %#v", tpKey, topologyPairsPodSpreadMap.topologyKeyToMinPodsMap)
klog.Errorf("internal error: get paths from key %q of %#v", tpKey, podSpreadCache.tpKeyToCriticalPaths)
continue
}
// judging criteria:
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
matchNum := len(topologyPairsPodSpreadMap.topologyPairToPods[pair])

// cast to int to avoid potential overflow.
skew := matchNum + selfMatchNum - int(minMatchNum)
if skew > int(constraint.MaxSkew) {
minMatchNum := paths[0].matchNum
matchNum := podSpreadCache.tpPairToMatchNum[pair]
skew := matchNum + selfMatchNum - minMatchNum
if skew > constraint.MaxSkew {
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, constraint.MaxSkew)
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/scheduler/algorithm/priorities/even_pods_spread.go
Expand Up @@ -38,23 +38,23 @@ type topologyPair struct {

type topologySpreadConstraintsMap struct {
// nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods.
nodeNameToPodCounts map[string]int64
nodeNameToPodCounts map[string]int32
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
topologyPairToPodCounts map[topologyPair]*int64
topologyPairToPodCounts map[topologyPair]*int32
}

func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap {
return &topologySpreadConstraintsMap{
nodeNameToPodCounts: make(map[string]int64),
topologyPairToPodCounts: make(map[topologyPair]*int64),
nodeNameToPodCounts: make(map[string]int32),
topologyPairToPodCounts: make(map[topologyPair]*int32),
}
}

// Note: the <nodes> passed in are the "filtered" nodes which have passed Predicates.
// This function iterates <nodes> to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int64 pointer for eligible node only.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int32 pointer for eligible node only.
func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) {
constraints := getSoftTopologySpreadConstraints(pod)
for _, node := range nodes {
Expand All @@ -64,7 +64,7 @@ func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node)
for _, constraint := range constraints {
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if t.topologyPairToPodCounts[pair] == nil {
t.topologyPairToPodCounts[pair] = new(int64)
t.topologyPairToPodCounts[pair] = new(int32)
}
}
t.nodeNameToPodCounts[node.Name] = 0
Expand Down Expand Up @@ -122,7 +122,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
}

// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int64(0)
matchSum := int32(0)
for _, existingPod := range nodeInfo.Pods() {
match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil {
Expand All @@ -133,17 +133,17 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
matchSum++
}
}
atomic.AddInt64(t.topologyPairToPodCounts[pair], matchSum)
atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processAllNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}

var minCount int64 = math.MaxInt64
var minCount int32 = math.MaxInt32
// <total> sums up the number of matching pods on each qualified topology pair
var total int64
var total int32
for _, node := range nodes {
if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
continue
Expand Down
68 changes: 15 additions & 53 deletions pkg/scheduler/algorithm/priorities/even_pods_spread_test.go
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package priorities

import (
"fmt"
"reflect"
"testing"

Expand All @@ -32,8 +31,8 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
name string
pod *v1.Pod
nodes []*v1.Node
wantNodeNameMap map[string]int64
wantTopologyPairMap map[topologyPair]*int64
wantNodeNameMap map[string]int32
wantTopologyPairMap map[topologyPair]*int32
}{
{
name: "normal case",
Expand All @@ -46,17 +45,17 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
},
wantNodeNameMap: map[string]int64{
wantNodeNameMap: map[string]int32{
"node-a": 0,
"node-b": 0,
"node-x": 0,
},
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "zone", value: "zone2"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
{key: "node", value: "node-x"}: new(int64),
wantTopologyPairMap: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: new(int32),
{key: "zone", value: "zone2"}: new(int32),
{key: "node", value: "node-a"}: new(int32),
{key: "node", value: "node-b"}: new(int32),
{key: "node", value: "node-x"}: new(int32),
},
},
{
Expand All @@ -70,14 +69,14 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("node", "node-x").Obj(),
},
wantNodeNameMap: map[string]int64{
wantNodeNameMap: map[string]int32{
"node-a": 0,
"node-b": 0,
},
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
wantTopologyPairMap: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: new(int32),
{key: "node", value: "node-a"}: new(int32),
{key: "node", value: "node-b"}: new(int32),
},
},
}
Expand Down Expand Up @@ -445,43 +444,6 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
}
}

func makeNodesAndPods(pod *v1.Pod, existingPodsNum, allNodesNum, filteredNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node, filteredNodes []*v1.Node) {
var topologyKeys []string
var labels []string
// regions := 3
zones := 10
for _, c := range pod.Spec.TopologySpreadConstraints {
topologyKeys = append(topologyKeys, c.TopologyKey)
labels = append(labels, c.LabelSelector.MatchExpressions[0].Key)
}
// build nodes
for i := 0; i < allNodesNum; i++ {
nodeWrapper := st.MakeNode().Name(fmt.Sprintf("node%d", i))
for _, tpKey := range topologyKeys {
if tpKey == "zone" {
nodeWrapper = nodeWrapper.Label("zone", fmt.Sprintf("zone%d", i%zones))
} else if tpKey == "node" {
nodeWrapper = nodeWrapper.Label("node", fmt.Sprintf("node%d", i))
}
}
node := nodeWrapper.Obj()
allNodes = append(allNodes, node)
if len(filteredNodes) < filteredNodesNum {
filteredNodes = append(filteredNodes, node)
}
}
// build pods
for i := 0; i < existingPodsNum; i++ {
podWrapper := st.MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum))
// apply labels[0], labels[0,1], ..., labels[all] to each pod in turn
for _, label := range labels[:i%len(labels)+1] {
podWrapper = podWrapper.Label(label, "")
}
existingPods = append(existingPods, podWrapper.Obj())
}
return
}

func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
tests := []struct {
name string
Expand Down Expand Up @@ -521,7 +483,7 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := makeNodesAndPods(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
existingPods, allNodes, filteredNodes := st.MakeNodesAndPods(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -1107,7 +1107,7 @@ func selectVictimsOnNode(
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
meta.RemovePod(rp, nodeInfoCopy.Node())
}
}
addPod := func(ap *v1.Pod) {
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/testing/BUILD
Expand Up @@ -6,6 +6,7 @@ go_library(
name = "go_default_library",
srcs = [
"fake_lister.go",
"workload_prep.go",
"wrappers.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/testing",
Expand Down
67 changes: 67 additions & 0 deletions pkg/scheduler/testing/workload_prep.go
@@ -0,0 +1,67 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package testing

import (
"fmt"

"k8s.io/api/core/v1"
)

// MakeNodesAndPods serves as a testing helper for EvenPodsSpread feature.
// It builds a fake cluster containing running Pods and Nodes.
// The size of Pods and Nodes are determined by input arguments.
// The specs of Pods and Nodes are generated with the following rules:
// - If `pod` has "node" as a topologyKey, each generated node is applied with a unique label: "node: node<i>".
// - If `pod` has "zone" as a topologyKey, each generated node is applied with a rotating label: "zone: zone[0-9]".
// - Depending on "lableSelector.MatchExpressions[0].Key" the `pod` has in each topologySpreadConstraint,
// each generated pod will be applied with label "key1", "key1,key2", ..., "key1,key2,...,keyN" in a rotating manner.
func MakeNodesAndPods(pod *v1.Pod, existingPodsNum, allNodesNum, filteredNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node, filteredNodes []*v1.Node) {
var topologyKeys []string
var labels []string
zones := 10
for _, c := range pod.Spec.TopologySpreadConstraints {
topologyKeys = append(topologyKeys, c.TopologyKey)
labels = append(labels, c.LabelSelector.MatchExpressions[0].Key)
}
// build nodes
for i := 0; i < allNodesNum; i++ {
nodeWrapper := MakeNode().Name(fmt.Sprintf("node%d", i))
for _, tpKey := range topologyKeys {
if tpKey == "zone" {
nodeWrapper = nodeWrapper.Label("zone", fmt.Sprintf("zone%d", i%zones))
} else if tpKey == "node" {
nodeWrapper = nodeWrapper.Label("node", fmt.Sprintf("node%d", i))
}
}
node := nodeWrapper.Obj()
allNodes = append(allNodes, node)
if len(filteredNodes) < filteredNodesNum {
filteredNodes = append(filteredNodes, node)
}
}
// build pods
for i := 0; i < existingPodsNum; i++ {
podWrapper := MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum))
// apply labels[0], labels[0,1], ..., labels[all] to each pod in turn
for _, label := range labels[:i%len(labels)+1] {
podWrapper = podWrapper.Label(label, "")
}
existingPods = append(existingPods, podWrapper.Obj())
}
return
}

0 comments on commit e7ecb22

Please sign in to comment.