Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler to enable switch on equivalence cache #34685

Merged
merged 1 commit into from
Oct 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.Node
}
return &predicateMetadata{
podBestEffort: isPodBestEffort(pod),
podRequest: getResourceRequest(pod),
podPorts: getUsedPorts(pod),
podRequest: GetResourceRequest(pod),
podPorts: GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *
return true, nil, nil
}

func getResourceRequest(pod *api.Pod) *schedulercache.Resource {
func GetResourceRequest(pod *api.Pod) *schedulercache.Resource {
result := schedulercache.Resource{}
for _, container := range pod.Spec.Containers {
requests := container.Resources.Requests
Expand Down Expand Up @@ -459,7 +459,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
podRequest = predicateMeta.podRequest
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = getResourceRequest(pod)
podRequest = GetResourceRequest(pod)
}
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 {
return len(predicateFails) == 0, predicateFails, nil
Expand Down Expand Up @@ -702,14 +702,14 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
wantPorts = predicateMeta.podPorts
} else {
// We couldn't parse metadata - fallback to computing it.
wantPorts = getUsedPorts(pod)
wantPorts = GetUsedPorts(pod)
}
if len(wantPorts) == 0 {
return true, nil, nil
}

// TODO: Aggregate it at the NodeInfo level.
existingPorts := getUsedPorts(nodeInfo.Pods()...)
existingPorts := GetUsedPorts(nodeInfo.Pods()...)
for wport := range wantPorts {
if wport != 0 && existingPorts[wport] {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
Expand All @@ -718,7 +718,7 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
return true, nil, nil
}

func getUsedPorts(pods ...*api.Pod) map[int]bool {
func GetUsedPorts(pods ...*api.Pod) map[int]bool {
ports := make(map[int]bool)
for _, pod := range pods {
for j := range pod.Spec.Containers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func TestGetUsedPorts(t *testing.T) {
}

for _, test := range tests {
ports := getUsedPorts(test.pods...)
ports := GetUsedPorts(test.pods...)
if !reflect.DeepEqual(test.ports, ports) {
t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports)
}
Expand Down
62 changes: 50 additions & 12 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type ConfigFactory struct {

scheduledPodPopulator *cache.Controller
nodePopulator *cache.Controller
pvPopulator *cache.Controller
pvcPopulator *cache.Controller
servicePopulator *cache.Controller
controllerPopulator *cache.Controller

schedulerCache schedulercache.Cache

Expand All @@ -93,6 +97,9 @@ type ConfigFactory struct {

// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
FailureDomains string

// Equivalence class cache
EquivalencePodCache *scheduler.EquivalenceCache
}

// Initializes the factory.
Expand Down Expand Up @@ -147,15 +154,48 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
},
)

// TODO(harryz) need to fill all the handlers here and below for equivalence cache
c.PVLister.Store, c.pvPopulator = cache.NewInformer(
c.createPersistentVolumeLW(),
&api.PersistentVolume{},
0,
cache.ResourceEventHandlerFuncs{},
)

c.PVCLister.Store, c.pvcPopulator = cache.NewInformer(
c.createPersistentVolumeClaimLW(),
&api.PersistentVolumeClaim{},
0,
cache.ResourceEventHandlerFuncs{},
)

c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(),
&api.Service{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
c.createControllerLW(),
&api.ReplicationController{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is already quite huge, and is far from being trivial. Could you please split the more obvious parts to a separate PR.
I would like to see a PR only with:

  • above changes to predicates.go
  • adding these Populators here (but just with empty handler functions).

This PR will be definite not controversial and we would be able to merge that one fast, and continue working on this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea, that makes scope much clear

return c
}

// TODO(harryz) need to update all the handlers here and below for equivalence cache
func (c *ConfigFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod: %v", obj)
return
}

if err := c.schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
Expand All @@ -172,6 +212,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
glog.Errorf("cannot convert newObj to *api.Pod: %v", newObj)
return
}

if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
Expand Down Expand Up @@ -204,6 +245,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) {
glog.Errorf("cannot convert to *api.Node: %v", obj)
return
}

if err := c.schedulerCache.AddNode(node); err != nil {
glog.Errorf("scheduler cache AddNode failed: %v", err)
}
Expand All @@ -220,6 +262,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
glog.Errorf("cannot convert newObj to *api.Node: %v", newObj)
return
}

if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
glog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
Expand Down Expand Up @@ -407,20 +450,15 @@ func (f *ConfigFactory) Run() {
// Begin populating nodes.
go f.nodePopulator.Run(f.StopEverything)

// Watch PVs & PVCs
// They may be listed frequently for scheduling constraints, so provide a local up-to-date cache.
cache.NewReflector(f.createPersistentVolumeLW(), &api.PersistentVolume{}, f.PVLister.Store, 0).RunUntil(f.StopEverything)
cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything)
// Begin populating pv & pvc
go f.pvPopulator.Run(f.StopEverything)
go f.pvcPopulator.Run(f.StopEverything)

// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything)
// Begin populating services
go f.servicePopulator.Run(f.StopEverything)

// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Indexer, 0).RunUntil(f.StopEverything)
// Begin populating controllers
go f.controllerPopulator.Run(f.StopEverything)

// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
Expand Down
4 changes: 4 additions & 0 deletions plugin/pkg/scheduler/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type genericScheduler struct {
lastNodeIndex uint64

cachedNodeInfoMap map[string]*schedulercache.NodeInfo

equivalenceCache *EquivalenceCache
}

// Schedule tries to schedule the given pod to one of node in the node list.
Expand Down Expand Up @@ -99,6 +101,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
return "", err
}

// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here

trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func TestGenericScheduler(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
}

scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer,
test.prioritizers, []algorithm.SchedulerExtender{})
Expand Down