Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make node tree order part of the snapshot #84014

Merged
merged 1 commit into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 15 additions & 19 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,15 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}
trace.Step("Running prefilter plugins done")

numNodes := g.cache.NodeTree().NumNodes()
if numNodes == 0 {
return result, ErrNoNodesAvailable
}

if err := g.snapshot(); err != nil {
return result, err
}
trace.Step("Snapshoting scheduler cache and node infos done")

if len(g.nodeInfoSnapshot.NodeInfoList) == 0 {
return result, ErrNoNodesAvailable
}

startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
if err != nil {
Expand All @@ -213,7 +212,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: numNodes,
NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList),
FailedPredicates: failedPredicateMap,
FilteredNodesStatuses: filteredNodesStatuses,
}
Expand Down Expand Up @@ -460,13 +459,13 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
if len(g.predicates) == 0 && !g.framework.HasFilterPlugins() {
filtered = g.nodeInfoSnapshot.ListNodes()
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
allNodes := int32(len(g.nodeInfoSnapshot.NodeInfoList))
numNodesToFind := g.numFeasibleNodesToFind(allNodes)

// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
errCh := util.NewErrorChannel()
var (
predicateResultLock sync.Mutex
filteredLen int32
Expand All @@ -479,20 +478,17 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})

checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()

nodeInfo := g.nodeInfoSnapshot.NodeInfoList[i]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broke scalability tests: #84151

TL;DR; this breaks spreading of pods in large clusters.

What exactly has happened:
1, in large enough clusters, we are using the features of finding only N feasible nodes and scoring only those:

numNodesToFind := g.numFeasibleNodesToFind(allNodes)

  1. Before this change, we were looking for nodes starting at the point where we previously stopped (because Next() was done at the level of the original tree).
  2. With this change, we're always starting from 0.

So assume, you have 5k nodes, all are feasible, and numFeasible is choosing 250.

  • previously, you first chose nodes [1..250], then [251..500], ... [4751..5000], [1..250], ...
  • with this change, we will chose [1..250], [1..250], [1..250], ... until one of those nodes become unfeasible and we will choose something different.

With this PR, next() is called only in UpdateNodeSnapshotInfo:
https://github.com/kubernetes/kubernetes/pull/84014/files#diff-f4a894ca5e905aa5f613269fc967fe2cR206
and if the set of nodes doesn't change, we will pretty much always be generating the same set of nodes.

This kind of breaks the fact that scheduler is scheduling in the whole cluster. While it's not documented feature per-se, I think this isn't the right thing to do.

I'm going to open a revert of this PR to to fix scalability tests (or the half of them, because we seem to have two different regressions), but will wait for your explicit approval.
We can discuss how to fix that later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh... it's no longer possible to autorevert it..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately the conflicts were trivial - opened #84222

fits, failedPredicates, status, err := g.podFitsOnNode(
ctx,
state,
pod,
meta,
g.nodeInfoSnapshot.NodeInfoMap[nodeName],
nodeInfo,
g.alwaysCheckAllPredicates,
)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
errCh.SendErrorWithCancel(err, cancel)
return
}
if fits {
Expand All @@ -501,15 +497,15 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
filtered[length-1] = nodeInfo.Node()
}
} else {
predicateResultLock.Lock()
if !status.IsSuccess() {
filteredNodesStatuses[nodeName] = status
filteredNodesStatuses[nodeInfo.Node().Name] = status
}
if len(failedPredicates) != 0 {
failedPredicateMap[nodeName] = failedPredicates
failedPredicateMap[nodeInfo.Node().Name] = failedPredicates
}
predicateResultLock.Unlock()
}
Expand All @@ -520,8 +516,8 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs)
if err := errCh.ReceiveError(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right, we used to return all predicates errors to the users, but here it changes to a single error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was collecting errors from all nodes, not predicates. Why is it useful to continue to examine all nodes and end up existing anyways? remember that this is error, not predicate failure, so it is likely that something internal went wrong and caused the error, so again keeping iterating over all nodes does not seem useful to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I misread, err is for unexpected internal error, not PredicateFailure. Nevermind then.

return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
}
}

Expand Down
23 changes: 15 additions & 8 deletions pkg/scheduler/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type schedulerCache struct {
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
nodeTree *NodeTree
nodeTree *nodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
Expand Down Expand Up @@ -238,6 +238,17 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodei
}
}
}

// Take a snapshot of the nodes order in the tree
nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop seems to be both memory and time consuming, can we have a benchmark testing the whole scheduling cycle?

Copy link
Member Author

@ahg-g ahg-g Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really, as described in the issue description, we gain about 3%, as for memory, this is just an array of pointers, so even for cluster with 5k nodes for example, the overhead is negligible.

Note that we do something similar in the predicates metadata, here are examples:

allNodeNames := make([]string, 0, len(nodeInfoMap))

allNodeNames := make([]string, 0, len(nodeInfoMap))

nodeName := cache.nodeTree.next()
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil {
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n)
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
return nil
}

Expand Down Expand Up @@ -516,7 +527,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
}
cache.moveNodeInfoToHead(node.Name)

cache.nodeTree.AddNode(node)
cache.nodeTree.addNode(node)
cache.addNodeImageStates(node, n.info)
return n.info.SetNode(node)
}
Expand All @@ -534,7 +545,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
}
cache.moveNodeInfoToHead(newNode.Name)

cache.nodeTree.UpdateNode(oldNode, newNode)
cache.nodeTree.updateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n.info)
return n.info.SetNode(newNode)
}
Expand All @@ -560,7 +571,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.moveNodeInfoToHead(node.Name)
}

if err := cache.nodeTree.RemoveNode(node); err != nil {
if err := cache.nodeTree.removeNode(node); err != nil {
return err
}
cache.removeNodeImageStates(node)
Expand Down Expand Up @@ -688,10 +699,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
return nil
}

func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}

// GetNodeInfo returns cached data for the node name.
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func TestNodeOperators(t *testing.T) {
if !found {
t.Errorf("Failed to find node %v in internalcache.", node.Name)
}
if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name {
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
}

Expand Down Expand Up @@ -1109,7 +1109,7 @@ func TestNodeOperators(t *testing.T) {
t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected)
}
// Check nodeTree after update
if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name {
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
}

Expand All @@ -1120,7 +1120,7 @@ func TestNodeOperators(t *testing.T) {
}
// Check nodeTree after remove. The node should be removed from the nodeTree even if there are
// still pods on it.
if cache.nodeTree.NumNodes() != 0 || cache.nodeTree.Next() != "" {
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/internal/cache/fake/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ func (c *Cache) Snapshot() *internalcache.Snapshot {
return &internalcache.Snapshot{}
}

// NodeTree is a fake method for testing.
func (c *Cache) NodeTree() *internalcache.NodeTree { return nil }

// GetNodeInfo is a fake method for testing.
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
Expand All @@ -120,3 +117,8 @@ func (c *Cache) ListNodes() []*v1.Node {
func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
return nil, nil
}

// NumNodes is a fake method for testing.
func (c *Cache) NumNodes() int {
return 0
}
3 changes: 0 additions & 3 deletions pkg/scheduler/internal/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ type Cache interface {

// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot

// NodeTree returns a node tree structure
NodeTree() *NodeTree
}

// Snapshot is a snapshot of cache state
Expand Down
58 changes: 17 additions & 41 deletions pkg/scheduler/internal/cache/node_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@ package cache

import (
"fmt"
"sync"

"k8s.io/api/core/v1"
utilnode "k8s.io/kubernetes/pkg/util/node"

"k8s.io/klog"
utilnode "k8s.io/kubernetes/pkg/util/node"
)

// NodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// nodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
type NodeTree struct {
// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller.
// It is used only by schedulerCache, and should stay as such.
type nodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
numNodes int
mu sync.RWMutex
}

// nodeArray is a struct that has nodes that are in a zone.
Expand All @@ -58,8 +57,8 @@ func (na *nodeArray) next() (nodeName string, exhausted bool) {
}

// newNodeTree creates a NodeTree from nodes.
func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{
func newNodeTree(nodes []*v1.Node) *nodeTree {
nt := &nodeTree{
tree: make(map[string]*nodeArray),
}
for _, n := range nodes {
Expand All @@ -68,15 +67,9 @@ func newNodeTree(nodes []*v1.Node) *NodeTree {
return nt
}

// AddNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// addNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}

func (nt *NodeTree) addNode(n *v1.Node) {
func (nt *nodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
Expand All @@ -94,14 +87,8 @@ func (nt *NodeTree) addNode(n *v1.Node) {
nt.numNodes++
}

// RemoveNode removes a node from the NodeTree.
func (nt *NodeTree) RemoveNode(n *v1.Node) error {
nt.mu.Lock()
defer nt.mu.Unlock()
return nt.removeNode(n)
}

func (nt *NodeTree) removeNode(n *v1.Node) error {
// removeNode removes a node from the NodeTree.
func (nt *nodeTree) removeNode(n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
Expand All @@ -122,7 +109,7 @@ func (nt *NodeTree) removeNode(n *v1.Node) error {

// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *NodeTree) removeZone(zone string) {
func (nt *nodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
Expand All @@ -132,8 +119,8 @@ func (nt *NodeTree) removeZone(zone string) {
}
}

// UpdateNode updates a node in the NodeTree.
func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
// updateNode updates a node in the NodeTree.
func (nt *nodeTree) updateNode(old, new *v1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
Expand All @@ -144,24 +131,20 @@ func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
if oldZone == newZone {
return
}
nt.mu.Lock()
defer nt.mu.Unlock()
nt.removeNode(old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(new)
}

func (nt *NodeTree) resetExhausted() {
func (nt *nodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}

// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
func (nt *nodeTree) next() string {
if len(nt.zones) == 0 {
return ""
}
Expand All @@ -185,10 +168,3 @@ func (nt *NodeTree) Next() string {
}
}
}

// NumNodes returns the number of nodes.
func (nt *NodeTree) NumNodes() int {
nt.mu.RLock()
defer nt.mu.RUnlock()
return nt.numNodes
}