Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add LabelSelectorSpreadPriority for ReplicaSet in scheduler #21074

Merged
merged 1 commit into from
Feb 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions plugin/pkg/scheduler/algorithm/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels"
)

Expand Down Expand Up @@ -140,3 +142,56 @@ func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api

return
}

// ReplicaSetLister interface represents anything that can produce a list of ReplicaSet; the list is consumed by a scheduler.
type ReplicaSetLister interface {
// Lists all the replicasets
List() ([]extensions.ReplicaSet, error)
// Gets the replicasets for the given pod
GetPodReplicaSets(*api.Pod) ([]extensions.ReplicaSet, error)
}

// EmptyReplicaSetLister implements ReplicaSetLister on []extensions.ReplicaSet returning empty data
type EmptyReplicaSetLister struct{}

// List returns nil
func (f EmptyReplicaSetLister) List() ([]extensions.ReplicaSet, error) {
return nil, nil
}

// GetPodReplicaSets returns nil
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
return nil, nil
}

// FakeReplicaSetLister implements ControllerLister on []extensions.ReplicaSet for test purposes.
type FakeReplicaSetLister []extensions.ReplicaSet

// List returns []extensions.ReplicaSet, the list of all ReplicaSets.
func (f FakeReplicaSetLister) List() ([]extensions.ReplicaSet, error) {
return f, nil
}

// GetPodReplicaSets gets the ReplicaSets that have the selector that match the labels on the given pod
func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
var selector labels.Selector

for _, rs := range f {
if rs.Namespace != pod.Namespace {
continue
}
selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return
}

if selector.Matches(labels.Set(pod.Labels)) {
rss = append(rss, rs)
}
}
if len(rss) == 0 {
err = fmt.Errorf("Could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

return
}
3 changes: 2 additions & 1 deletion plugin/pkg/scheduler/algorithm/priorities/priorities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestZeroRequest(t *testing.T) {
// This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.
[]algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}},
[]algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{}), algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})), Weight: 1}},
algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{})
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
18 changes: 14 additions & 4 deletions plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ type SelectorSpread struct {
podLister algorithm.PodLister
serviceLister algorithm.ServiceLister
controllerLister algorithm.ControllerLister
replicaSetLister algorithm.ReplicaSetLister
}

func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction {
func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister) algorithm.PriorityFunction {
selectorSpread := &SelectorSpread{
podLister: podLister,
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
}
return selectorSpread.CalculateSpreadPriority
}
Expand Down Expand Up @@ -86,10 +88,18 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
}
}
controllers, err := s.controllerLister.GetPodControllers(pod)
rcs, err := s.controllerLister.GetPodControllers(pod)
if err == nil {
for _, controller := range controllers {
selectors = append(selectors, labels.SelectorFromSet(controller.Spec.Selector))
for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
}
}
rss, err := s.replicaSetLister.GetPodReplicaSets(pod)
if err == nil {
for _, rs := range rss {
if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"testing"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand All @@ -48,6 +50,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
pods []*api.Pod
nodes []string
rcs []api.ReplicationController
rss []extensions.ReplicaSet
services []api.Service
expectedList schedulerapi.HostPriorityList
test string
Expand Down Expand Up @@ -177,6 +180,20 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "service with partial pod label matches with service and replication controller",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "service with partial pod label matches with service and replication controller",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}},
pods: []*api.Pod{
Expand All @@ -191,6 +208,20 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "disjoined service and replication controller should be treated equally",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "disjoined service and replication controller should be treated equally",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
Expand All @@ -204,6 +235,19 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "Replication controller with partial pod label matches",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "Replication controller with partial pod label matches",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
Expand All @@ -216,11 +260,24 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "Replication controller with partial pod label matches",
},
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "Replication controller with partial pod label matches",
},
}

for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)}
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -273,6 +330,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
pods []*api.Pod
nodes []string
rcs []api.ReplicationController
rss []extensions.ReplicaSet
services []api.Service
expectedList schedulerapi.HostPriorityList
test string
Expand Down Expand Up @@ -420,7 +478,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {

for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)}
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"reflect"
"testing"

"k8s.io/kubernetes/pkg/api/testapi"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"net/http/httptest"
)

func TestCompatibility_v1_Scheduler(t *testing.T) {
Expand Down Expand Up @@ -100,7 +104,18 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
if !reflect.DeepEqual(policy, tc.ExpectedPolicy) {
t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy)
}
if _, err := factory.NewConfigFactory(nil, "some-scheduler-name").CreateFromConfig(policy); err != nil {

handler := utiltesting.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
// TODO: Uncomment when fix #19254
// defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})

if _, err := factory.NewConfigFactory(client, "some-scheduler-name").CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func init() {
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{})
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{})
},
Weight: 1,
},
Expand Down Expand Up @@ -141,7 +141,7 @@ func defaultPriorities() sets.String {
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister)
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister, args.ReplicaSetLister)
},
Weight: 1,
},
Expand Down
19 changes: 17 additions & 2 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/api/validation"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions"
)

const (
Expand All @@ -66,6 +67,8 @@ type ConfigFactory struct {
ServiceLister *cache.StoreToServiceLister
// a means to list all controllers
ControllerLister *cache.StoreToReplicationControllerLister
// a means to list all replicasets
ReplicaSetLister *cache.StoreToReplicaSetLister

// Close this to stop all reflectors
StopEverything chan struct{}
Expand All @@ -91,6 +94,7 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
SchedulerName: schedulerName,
}
Expand Down Expand Up @@ -188,6 +192,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
PodLister: f.PodLister,
ServiceLister: f.ServiceLister,
ControllerLister: f.ControllerLister,
ReplicaSetLister: f.ReplicaSetLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: &predicates.CachedNodeInfo{f.NodeLister},
Expand Down Expand Up @@ -220,15 +225,20 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything)

// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers, so that it can spread them correctly.
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)

// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers, so that it can spread them correctly.
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything)

// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything)

r := rand.New(rand.NewSource(time.Now().UnixNano()))

algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r)
Expand Down Expand Up @@ -332,6 +342,11 @@ func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

// Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.ExtensionsClient, "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
return func(pod *api.Pod, err error) {
if err == scheduler.ErrNoNodesAvailable {
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/factory/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type PluginFactoryArgs struct {
PodLister algorithm.PodLister
ServiceLister algorithm.ServiceLister
ControllerLister algorithm.ControllerLister
ReplicaSetLister algorithm.ReplicaSetLister
NodeLister algorithm.NodeLister
NodeInfo predicates.NodeInfo
PVInfo predicates.PersistentVolumeInfo
Expand Down