Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of remaining pods when a node is deleted #93938

Merged
merged 2 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,12 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
return nil
}

// RemoveNode removes the node object, leaving all other tracking information.
func (n *NodeInfo) RemoveNode() {
n.node = nil
n.Generation = nextGeneration()
}

// FilterOutPods receives a list of pods and filters out those whose node names
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
//
Expand Down
1 change: 0 additions & 1 deletion pkg/scheduler/internal/cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
Expand Down
73 changes: 32 additions & 41 deletions pkg/scheduler/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -315,7 +314,9 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
}
}

func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, error) {
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
func (cache *schedulerCache) PodCount() (int, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
Expand All @@ -325,15 +326,11 @@ func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, erro
for _, n := range cache.nodes {
maxSize += len(n.info.Pods)
}
pods := make([]*v1.Pod, 0, maxSize)
count := 0
for _, n := range cache.nodes {
for _, p := range n.info.Pods {
if selector.Matches(labels.Set(p.Pod.Labels)) {
pods = append(pods, p.Pod)
}
}
count += len(n.info.Pods)
}
return pods, nil
return count, nil
}

func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
Expand Down Expand Up @@ -423,13 +420,6 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {

// Assumes that lock is already acquired.
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
if _, ok := cache.nodes[newPod.Spec.NodeName]; !ok {
// The node might have been deleted already.
// This is not a problem in the case where a pod update arrives before the
// node creation, because we will always have a create pod event before
// that, which will create the placeholder node item.
return nil
}
if err := cache.removePod(oldPod); err != nil {
return err
}
Expand All @@ -438,18 +428,23 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
}

// Assumes that lock is already acquired.
// Removes a pod from the cached node info. When a node is removed, some pod
// deletion events might arrive later. This is not a problem, as the pods in
// the node are assumed to be removed already.
// Removes a pod from the cached node info. If the node information was already
// removed and there are no more pods left in the node, cleans up the node from
// the cache.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recalled the original logic returned an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did. But returning nil is actually safer in the case of extraneous update events that might arrive before a node is created, and after the original node was completely removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to returning nil and just logging an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the usage of removePod(), there are still a number of callers rely on the returned value. So I'd suggest to revert to the original state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detail for each caller:

  • ForgetPod: We actually want to proceed and clear the assumedPods and podStates.
  • expirePod: Same as above.
  • AddPod: it just logs the error returned, so same effect.
  • RemovePod: We want to clear podStates.
  • updatePod: This is the case where we want to prevent losing information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, for expirePod and ForgetPod, the node shouldn't have been removed because it still had pods assigned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That's fair.

return nil
}
if err := n.info.RemovePod(pod); err != nil {
return err
}
cache.moveNodeInfoToHead(pod.Spec.NodeName)
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil
}

Expand Down Expand Up @@ -619,21 +614,30 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
return n.info.SetNode(newNode)
}

// RemoveNode removes a node from the cache.
// Some nodes might still have pods because their deletion events didn't arrive
// yet. For most intents and purposes, those pods are removed from the cache,
// having it's source of truth in the cached nodes.
// However, some information on pods (assumedPods, podStates) persist. These
// caches will be eventually consistent as pod deletion events arrive.
// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()

_, ok := cache.nodes[node.Name]
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
cache.removeNodeInfoFromList(node.Name)
n.info.RemoveNode()
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.removeNode(node); err != nil {
return err
}
Expand Down Expand Up @@ -736,19 +740,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
return nil
}

// GetNodeInfo returns cached data for the node name.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 2 functions in pkg/scheduler/internal/cache/fake/fake_cache.go can also be removed:

  • func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error)
  • func (c *Cache) ListNodes() []*v1.Node

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()

n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}

return n.info.Node(), nil
}

// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
func (cache *schedulerCache) updateMetrics() {
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
Expand Down
25 changes: 20 additions & 5 deletions pkg/scheduler/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestExpirePod(t *testing.T) {
{pod: testPods[0], finishBind: true, assumedTime: now},
},
cleanupTime: now.Add(2 * ttl),
wNodeInfo: framework.NewNodeInfo(),
wNodeInfo: nil,
}, { // first one would expire, second and third would not.
pods: []*testExpirePodStruct{
{pod: testPods[0], finishBind: true, assumedTime: now},
Expand Down Expand Up @@ -1142,10 +1142,12 @@ func TestNodeOperators(t *testing.T) {
if err := cache.RemoveNode(node); err != nil {
t.Error(err)
}
if _, err := cache.GetNodeInfo(node.Name); err == nil {
t.Errorf("The node %v should be removed.", node.Name)
if n, err := cache.getNodeInfo(node.Name); err != nil {
t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err)
} else if n != nil {
t.Errorf("The node object for %v should be nil", node.Name)
}
// Check node is removed from nodeTree as well.
// Check node is removed from nodeTree.
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
Expand Down Expand Up @@ -1466,7 +1468,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
var i int
// Check that cache is in the expected state.
for node := cache.headNode; node != nil; node = node.next {
if node.info.Node().Name != test.expected[i].Name {
if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name {
t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
}
i++
Expand Down Expand Up @@ -1798,3 +1800,16 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
}
return nil
}

// getNodeInfo returns cached data for the node name.
func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()

n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}

return n.info.Node(), nil
}
12 changes: 6 additions & 6 deletions pkg/scheduler/internal/cache/debugger/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (d *CacheDumper) DumpAll() {
func (d *CacheDumper) dumpNodes() {
dump := d.cache.Dump()
klog.Info("Dump of cached NodeInfo")
for _, nodeInfo := range dump.Nodes {
klog.Info(d.printNodeInfo(nodeInfo))
for name, nodeInfo := range dump.Nodes {
klog.Info(d.printNodeInfo(name, nodeInfo))
}
}

Expand All @@ -61,16 +61,16 @@ func (d *CacheDumper) dumpSchedulingQueue() {
}

// printNodeInfo writes parts of NodeInfo to a string.
func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string {
func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string {
var nodeData strings.Builder
nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
n.Node().Name, n.Requested, n.Allocatable, len(n.Pods)))
nodeData.WriteString(fmt.Sprintf("\nNode name: %s\nDeleted: %t\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
name, n.Node() == nil, n.Requested, n.Allocatable, len(n.Pods)))
// Dumping Pod Info
for _, p := range n.Pods {
nodeData.WriteString(printPod(p.Pod))
}
// Dumping nominated pods info on the node
nominatedPods := d.podQueue.NominatedPodsForNode(n.Node().Name)
nominatedPods := d.podQueue.NominatedPodsForNode(name)
if len(nominatedPods) != 0 {
nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods)))
for _, p := range nominatedPods {
Expand Down
1 change: 0 additions & 1 deletion pkg/scheduler/internal/cache/fake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
deps = [
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)

Expand Down
15 changes: 2 additions & 13 deletions pkg/scheduler/internal/cache/fake/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package fake

import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)

Expand Down Expand Up @@ -78,20 +77,10 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error {
return nil
}

// ListPods is a fake method for testing.
func (c *Cache) ListPods(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
// PodCount is a fake method for testing.
func (c *Cache) PodCount() (int, error) { return 0, nil }

// Dump is a fake method for testing.
func (c *Cache) Dump() *internalcache.Dump {
return &internalcache.Dump{}
}

// GetNodeInfo is a fake method for testing.
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
}

// ListNodes is a fake method for testing.
func (c *Cache) ListNodes() []*v1.Node {
return nil
}
5 changes: 2 additions & 3 deletions pkg/scheduler/internal/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cache

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)

Expand Down Expand Up @@ -57,8 +56,8 @@ import (
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// ListPods lists all pods in the cache.
ListPods(selector labels.Selector) ([]*v1.Pod, error)
// PodCount returns the number of pods in the cache (including those from deleted nodes).
PodCount() (int, error)

// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
Expand Down
5 changes: 2 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -527,12 +526,12 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
return
default:
}
pods, err := scache.ListPods(labels.Everything())
pods, err := scache.PodCount()
if err != nil {
errChan <- fmt.Errorf("cache.List failed: %v", err)
return
}
if len(pods) == 0 {
if pods == 0 {
close(waitPodExpireChan)
return
}
Expand Down
1 change: 0 additions & 1 deletion test/integration/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ go_library(
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
Expand Down
5 changes: 2 additions & 3 deletions test/integration/scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -401,11 +400,11 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt
// waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.SchedulerCache.ListPods(labels.Everything())
cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount()
if err != nil {
return false, err
}
if len(pods) != len(cachedPods) {
if len(pods) != cachedPods {
return false, nil
}
for _, p := range pods {
Expand Down