Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for statefulset spreading to the scheduler #41708

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin/cmd/kube-scheduler/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
Expand Down
3 changes: 3 additions & 0 deletions plugin/cmd/kube-scheduler/app/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"

appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
Expand Down Expand Up @@ -79,6 +80,7 @@ func createScheduler(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
Expand All @@ -90,6 +92,7 @@ func createScheduler(
pvcInformer,
replicationControllerInformer,
replicaSetInformer,
statefulSetInformer,
serviceInformer,
s.HardPodAffinitySymmetricWeight,
)
Expand Down
1 change: 1 addition & 0 deletions plugin/cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func Run(s *options.SchedulerServer) error {
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
recorder,
)
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/algorithm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/algorithm/priorities/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
Expand Down
28 changes: 19 additions & 9 deletions plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,28 @@ const maxPriority float32 = 10
const zoneWeighting = 2.0 / 3.0

type SelectorSpread struct {
serviceLister algorithm.ServiceLister
controllerLister algorithm.ControllerLister
replicaSetLister algorithm.ReplicaSetLister
serviceLister algorithm.ServiceLister
controllerLister algorithm.ControllerLister
replicaSetLister algorithm.ReplicaSetLister
statefulSetLister algorithm.StatefulSetLister
}

func NewSelectorSpreadPriority(
serviceLister algorithm.ServiceLister,
controllerLister algorithm.ControllerLister,
replicaSetLister algorithm.ReplicaSetLister) algorithm.PriorityFunction {
replicaSetLister algorithm.ReplicaSetLister,
statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction {
selectorSpread := &SelectorSpread{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
return selectorSpread.CalculateSpreadPriority
}

// Returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister) []labels.Selector {
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
selectors := make([]labels.Selector, 0, 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsalamat Any reason, why this is having 3, was it b/c it was previously having 3 selectors ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it was related to the 3 selectors. New elements are appended to the "selectors" array in loops, so there could be potentially more than 3 elements. Appending to an array resizes it if there is not enough room in the array. So, this should be find with the new changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsalamat Thanks for responding back. My concern was not a programmatic one, rather from code readability perspective. A new developer coming in like me, may get confused with - something like why 3? I may be missing something here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code predates my changes. I don't know why 3 was chosen. I don't see any particular significance.

if services, err := sl.GetPodServices(pod); err == nil {
for _, service := range services {
Expand All @@ -76,11 +79,18 @@ func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.Controll
}
}
}
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
}
}
return selectors
}

func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector {
return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister)
return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}

// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand Down Expand Up @@ -60,6 +61,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
rcs []*v1.ReplicationController
rss []*extensions.ReplicaSet
services []*v1.Service
sss []*apps.StatefulSet
expectedList schedulerapi.HostPriorityList
test string
}{
Expand Down Expand Up @@ -202,6 +204,19 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "service with partial pod label matches with service and replica set",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "service with partial pod label matches with service and replica set",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
Expand Down Expand Up @@ -230,6 +245,19 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "disjoined service and replica set should be treated equally",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "disjoined service and replica set should be treated equally",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
Expand All @@ -256,6 +284,19 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
test: "Replica set with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use StatefulSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
test: "StatefulSet with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
Expand All @@ -281,14 +322,28 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "Another replication set with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
// We use StatefulSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "Another stateful set with partial pod label matches",
},
}

for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
selectorSpread := SelectorSpread{
serviceLister: schedulertesting.FakeServiceLister(test.services),
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
serviceLister: schedulertesting.FakeServiceLister(test.services),
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss),
}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes))
if err != nil {
Expand Down Expand Up @@ -346,6 +401,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
rcs []*v1.ReplicationController
rss []*extensions.ReplicaSet
services []*v1.Service
sss []*apps.StatefulSet
expectedList schedulerapi.HostPriorityList
test string
}{
Expand Down Expand Up @@ -493,9 +549,10 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
selectorSpread := SelectorSpread{
serviceLister: schedulertesting.FakeServiceLister(test.services),
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
serviceLister: schedulertesting.FakeServiceLister(test.services),
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss),
}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes))
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions plugin/pkg/scheduler/algorithm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package algorithm
import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand Down Expand Up @@ -127,3 +128,19 @@ type EmptyReplicaSetLister struct{}
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) (rss []*extensions.ReplicaSet, err error) {
return nil, nil
}

// StatefulSetLister interface represents anything that can produce a list of StatefulSet; the list is consumed by a scheduler.
type StatefulSetLister interface {
// Gets the StatefulSet for the given pod.
GetPodStatefulSets(*v1.Pod) ([]*apps.StatefulSet, error)
}

var _ StatefulSetLister = &EmptyStatefulSetLister{}

// EmptyStatefulSetLister implements StatefulSetLister on []apps.StatefulSet returning empty data.
type EmptyStatefulSetLister struct{}

// GetPodStatefulSets of EmptyStatefulSetLister returns nil.
func (f EmptyStatefulSetLister) GetPodStatefulSets(pod *v1.Pod) (sss []*apps.StatefulSet, err error) {
return nil, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
).CreateFromConfig(policy); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func init() {
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{})
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
},
Weight: 1,
},
Expand Down Expand Up @@ -180,7 +180,7 @@ func defaultPriorities() sets.String {
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister)
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion plugin/pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
algorithmpredicates "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
Expand Down Expand Up @@ -518,7 +519,8 @@ func TestZeroRequest(t *testing.T) {
Function: algorithmpriorities.NewSelectorSpreadPriority(
schedulertesting.FakeServiceLister([]*v1.Service{}),
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{})),
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})),
Weight: 1,
},
}
Expand Down
2 changes: 2 additions & 0 deletions plugin/pkg/scheduler/factory/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ go_library(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
Expand Down