Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: replace system modeler with scheduler cache and do O(1) lookup for resource req #20669

Merged
merged 1 commit into from
Feb 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 8 additions & 22 deletions plugin/pkg/scheduler/algorithm/priorities/priorities.go
Expand Up @@ -44,19 +44,12 @@ func calculateScore(requested int64, capacity int64, node string) int {

// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node.
func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := int64(0)
totalMemory := int64(0)
func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
totalMemory := nodeInfo.NonZeroRequest().Memory
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()

for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -93,7 +86,7 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca

list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods()))
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name]))
}
return list, nil
}
Expand Down Expand Up @@ -227,22 +220,15 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul

list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods()))
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name]))
}
return list, nil
}

func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := int64(0)
totalMemory := int64(0)
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
totalMemory := nodeInfo.NonZeroRequest().Memory
score := int(0)
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers {
Expand Down
Expand Up @@ -138,7 +138,6 @@ func TestZeroRequest(t *testing.T) {
list, err := scheduler.PrioritizeNodes(
test.pod,
nodeNameToInfo,
algorithm.FakePodLister(test.pods),
// This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.
Expand Down
3 changes: 2 additions & 1 deletion plugin/pkg/scheduler/extender_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)

type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error)
Expand Down Expand Up @@ -285,7 +286,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random)
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
if err == nil {
Expand Down
68 changes: 50 additions & 18 deletions plugin/pkg/scheduler/factory/factory.go
Expand Up @@ -42,6 +42,7 @@ import (

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

const (
Expand Down Expand Up @@ -74,7 +75,7 @@ type ConfigFactory struct {
StopEverything chan struct{}

scheduledPodPopulator *framework.Controller
modeler scheduler.SystemModeler
schedulerCache schedulercache.Cache

// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key:
Expand All @@ -84,6 +85,9 @@ type ConfigFactory struct {

// Initializes the factory.
func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)

c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
Expand All @@ -95,12 +99,12 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
schedulerCache: schedulerCache,
StopEverything: stopEverything,
SchedulerName: schedulerName,
}
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
c.modeler = modeler
c.PodLister = modeler.PodLister()

c.PodLister = schedulerCache

// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
Expand All @@ -112,21 +116,49 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
0,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
c.modeler.LockedAction(func() {
c.modeler.ForgetPod(pod)
})
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
newPod, ok := newObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
},
DeleteFunc: func(obj interface{}) {
c.modeler.LockedAction(func() {
switch t := obj.(type) {
case *api.Pod:
c.modeler.ForgetPod(t)
case cache.DeletedFinalStateUnknown:
c.modeler.ForgetPodByKey(t.Key)
var pod *api.Pod
switch t := obj.(type) {
case *api.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
})
default:
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.RemovePod(pod); err != nil {
glog.Errorf("scheduler cache RemovePod failed: %v", err)
}
},
},
)
Expand Down Expand Up @@ -241,7 +273,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,

r := rand.New(rand.NewSource(time.Now().UnixNano()))

algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r)
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)

podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
Expand All @@ -252,7 +284,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}

return &scheduler.Config{
Modeler: f.modeler,
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Expand Down
18 changes: 8 additions & 10 deletions plugin/pkg/scheduler/generic_scheduler.go
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
Expand Down Expand Up @@ -55,10 +54,10 @@ func (f *FitError) Error() string {
}

type genericScheduler struct {
cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
pods algorithm.PodLister
random *rand.Rand
randomLock sync.Mutex
}
Expand All @@ -75,13 +74,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
return "", ErrNoNodesAvailable
}

// TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute.
// But at least we're now only doing it in one place
pods, err := g.pods.List(labels.Everything())
// Used for all fit and priority funcs.
nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap()
if err != nil {
return "", err
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods)

filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
if err != nil {
return "", err
Expand All @@ -94,7 +92,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
}
}

priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -188,7 +186,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
result := schedulerapi.HostPriorityList{}

// If no priority configs are provided, then the EqualPriority function is applied
Expand Down Expand Up @@ -288,12 +286,12 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf
return result, nil
}

func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm {
func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, random *rand.Rand) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
predicates: predicates,
prioritizers: prioritizers,
extenders: extenders,
pods: pods,
random: random,
}
}
3 changes: 2 additions & 1 deletion plugin/pkg/scheduler/generic_scheduler_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)

func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
Expand Down Expand Up @@ -256,7 +257,7 @@ func TestGenericScheduler(t *testing.T) {

for _, test := range tests {
random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random)
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
if err == nil {
Expand Down