Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: predicates read nodes from scheduler cache #79076

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 22 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -453,7 +454,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

// TODO: get the real node object of ourself,
// and use the real node name and UID.
Expand Down Expand Up @@ -2281,3 +2282,23 @@ func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kub
}
return config
}

// CachedNodeInfo implements NodeInfo
type CachedNodeInfo struct {
corelisters.NodeLister
}

// GetNodeInfo returns cached data for the node name.
func (c *CachedNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
node, err := c.Get(nodeName)

if apierrors.IsNotFound(err) {
return nil, err
}

if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", nodeName, err)
}

return node, nil
}
20 changes: 0 additions & 20 deletions pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,6 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace
return c.PersistentVolumeClaims(namespace).Get(name)
}

// CachedNodeInfo implements NodeInfo
type CachedNodeInfo struct {
corelisters.NodeLister
}

// GetNodeInfo returns cached data for the node 'id'.
func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
node, err := c.Get(id)

if apierrors.IsNotFound(err) {
return nil, err
}

if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}

return node, nil
}

// StorageClassInfo interface represents anything that can get a storage class object by class name.
type StorageClassInfo interface {
GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
StatefulSetLister: c.statefulSetLister,
NodeLister: &nodeLister{c.nodeLister},
PDBLister: c.pdbLister,
NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister},
NodeInfo: c.schedulerCache,
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,16 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}

// GetNodeInfo returns cached data for the node name.
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()

n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("error retrieving node '%v' from cache", nodeName)
}

return n.info.Node(), nil
}
5 changes: 5 additions & 0 deletions pkg/scheduler/internal/cache/fake/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,8 @@ func (c *Cache) Snapshot() *internalcache.Snapshot {

// NodeTree is a fake method for testing.
func (c *Cache) NodeTree() *internalcache.NodeTree { return nil }

// GetNodeInfo is a fake method for testing.
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
}
3 changes: 3 additions & 0 deletions pkg/scheduler/internal/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type Cache interface {
// RemoveCSINode removes overall CSI-related information about node.
RemoveCSINode(csiNode *storagev1beta1.CSINode) error

// GetNodeInfo returns the node object with node string.
GetNodeInfo(nodeName string) (*v1.Node, error)

// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error)

Expand Down