Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a cachedNodeInfo in predicates #17827

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 0 additions & 17 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,6 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
return
}

// TODO Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToNodeLister.
// GetNodeInfo returns cached data for the node 'id'.
func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) {
node, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})

if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}

if !exists {
return nil, fmt.Errorf("node '%v' is not in cache", id)
}

return node.(*api.Node), nil
}

// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Store
Expand Down
6 changes: 4 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func NewMainKubelet(
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{nodeLister}

// TODO: get the real node object of ourself,
// and use the real node name and UID.
Expand Down Expand Up @@ -301,6 +302,7 @@ func NewMainKubelet(
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
nodeInfo: nodeInfo,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
Expand Down Expand Up @@ -473,7 +475,6 @@ type serviceLister interface {

type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}

// Kubelet is the main kubelet implementation.
Expand Down Expand Up @@ -527,6 +528,7 @@ type Kubelet struct {
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
nodeInfo predicates.NodeInfo

// a list of node labels to register
nodeLabels []string
Expand Down Expand Up @@ -822,7 +824,7 @@ func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
return kl.nodeLister.GetNodeInfo(kl.nodeName)
return kl.nodeInfo.GetNodeInfo(kl.nodeName)
}

// Starts garbage collection threads.
Expand Down
10 changes: 9 additions & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.nodeInfo = testNodeInfo{}
kubelet.recorder = fakeRecorder
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
Expand Down Expand Up @@ -1045,7 +1046,11 @@ type testNodeLister struct {
nodes []api.Node
}

func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
type testNodeInfo struct {
nodes []api.Node
}

func (ls testNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
Expand Down Expand Up @@ -2319,6 +2324,9 @@ func TestHandleNodeSelector(t *testing.T) {
kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
nodeInfo: testNodeInfo{},
statusManager: status.NewManager(nil, podManager),
containerRefManager: kubecontainer.NewRefManager(),
podManager: podManager,
Expand Down
20 changes: 20 additions & 0 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
Expand Down Expand Up @@ -52,6 +53,25 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
return nodes.Nodes().Get(nodeID)
}

type CachedNodeInfo struct {
*cache.StoreToNodeLister
}

// GetNodeInfo returns cached data for the node 'id'.
func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
node, exists, err := c.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})

if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}

if !exists {
return nil, fmt.Errorf("node '%v' is not in cache", id)
}

return node.(*api.Node), nil
}

func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
if volume.GCEPersistentDisk != nil {
disk := volume.GCEPersistentDisk
Expand Down
3 changes: 2 additions & 1 deletion plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/api/validation"

Expand Down Expand Up @@ -176,7 +177,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
ControllerLister: f.ControllerLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: f.NodeLister,
NodeInfo: &predicates.CachedNodeInfo{f.NodeLister},
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
if err != nil {
Expand Down