Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multi-scheduler: #17865

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Expand Up @@ -171,13 +171,13 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
handler.delegate = m.Handler

// Scheduler
schedulerConfigFactory := factory.NewConfigFactory(cl, nil)
schedulerConfigFactory := factory.NewConfigFactory(cl, nil, api.DefaultSchedulerName)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
glog.Fatalf("Couldn't create scheduler config: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: api.DefaultSchedulerName})
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()
Expand Down
1 change: 1 addition & 0 deletions docs/admin/kube-scheduler.md
Expand Up @@ -66,6 +66,7 @@ kube-scheduler
--policy-config-file="": File with scheduler policy configuration
--port=10251: The port that the scheduler's http service runs on
--profiling[=true]: Enable profiling via web interface host:port/debug/pprof/
--scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'
```

###### Auto generated by spf13/cobra on 14-Dec-2015
Expand Down
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Expand Up @@ -288,6 +288,7 @@ run-proxy
runtime-config
save-config
scheduler-config
scheduler-name
schema-cache-dir
secure-port
serialize-image-pulls
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/types.go
Expand Up @@ -2218,3 +2218,8 @@ type RangeAllocation struct {
// a single allocated address (the fifth bit on CIDR 10.0.0.0/8 is 10.0.0.4).
Data []byte `json:"data"`
}

const (
// "default-scheduler" is the name of default scheduler.
DefaultSchedulerName = "default-scheduler"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smarterclayton was saying we should make these constants versioned, so you should make the same change to pkg/api/v1/types.go

)
5 changes: 5 additions & 0 deletions pkg/api/v1/types.go
Expand Up @@ -2624,3 +2624,8 @@ type RangeAllocation struct {
// Data is a bit array containing all allocated addresses in the previous segment.
Data []byte `json:"data"`
}

const (
// "default-scheduler" is the name of default scheduler.
DefaultSchedulerName = "default-scheduler"
)
7 changes: 5 additions & 2 deletions plugin/cmd/kube-scheduler/app/server.go
Expand Up @@ -58,6 +58,7 @@ type SchedulerServer struct {
BindPodsBurst int
KubeAPIQPS float32
KubeAPIBurst int
SchedulerName string
}

// NewSchedulerServer creates a new SchedulerServer with default parameters
Expand All @@ -70,6 +71,7 @@ func NewSchedulerServer() *SchedulerServer {
BindPodsBurst: 100,
KubeAPIQPS: 50.0,
KubeAPIBurst: 100,
SchedulerName: api.DefaultSchedulerName,
}
return &s
}
Expand Down Expand Up @@ -107,6 +109,7 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.BindPodsBurst, "bind-pods-burst", s.BindPodsBurst, "Number of bindings per second scheduler is allowed to make during bursts")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
}

// Run runs the specified SchedulerServer. This should never exit.
Expand Down Expand Up @@ -142,14 +145,14 @@ func (s *SchedulerServer) Run(_ []string) error {
glog.Fatal(server.ListenAndServe())
}()

configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst))
configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst), s.SchedulerName)
config, err := s.createConfig(configFactory)
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}

eventBroadcaster := record.NewBroadcaster()
config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: s.SchedulerName})
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))

Expand Down
Expand Up @@ -100,7 +100,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
if !reflect.DeepEqual(policy, tc.ExpectedPolicy) {
t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy)
}
_, err = factory.NewConfigFactory(nil, nil).CreateFromConfig(policy)
_, err = factory.NewConfigFactory(nil, nil, "some-scheduler-name").CreateFromConfig(policy)
if err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue
Expand Down
34 changes: 30 additions & 4 deletions plugin/pkg/scheduler/factory/factory.go
Expand Up @@ -43,6 +43,10 @@ import (
"github.com/golang/glog"
)

const (
SchedulerAnnotationKey = "scheduler.alpha.kubernetes.io/name"
)

// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
Expand All @@ -66,10 +70,15 @@ type ConfigFactory struct {

scheduledPodPopulator *framework.Controller
modeler scheduler.SystemModeler

// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key:
// 'scheduler.alpha.kubernetes.io/name'
SchedulerName string
}

// Initializes the factory.
func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *ConfigFactory {
func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter, schedulerName string) *ConfigFactory {
c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
Expand All @@ -79,6 +88,7 @@ func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *Conf
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
SchedulerName: schedulerName,
}
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
c.modeler = modeler
Expand Down Expand Up @@ -228,16 +238,32 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
Algorithm: algo,
Binder: &binder{f.Client},
NextPod: func() *api.Pod {
pod := f.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("About to try and schedule pod %v", pod.Name)
return pod
return f.getNextPod()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use the function as the value of NextPod

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point -- I assume you mean this line and the previous and next can collapse to just
NextPod: f.getNextPod(),

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to NextPod: f.getNextPod() would require many follow-up changes which I think is not necessary. It is not as easy as I thought. Other than that, all comments are respected in the updated PR, including an integration test.

},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
BindPodsRateLimiter: f.BindPodsRateLimiter,
StopEverything: f.StopEverything,
}, nil
}

func (f *ConfigFactory) getNextPod() *api.Pod {
for {
pod := f.PodQueue.Pop().(*api.Pod)
if f.responsibleForPod(pod) {
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod
}
}
}

func (f *ConfigFactory) responsibleForPod(pod *api.Pod) bool {
if f.SchedulerName == api.DefaultSchedulerName {
return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName || pod.Annotations[SchedulerAnnotationKey] == ""
} else {
return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName
}
}

func getNodeConditionPredicate() cache.NodeConditionPredicate {
return func(node api.Node) bool {
for _, cond := range node.Status.Conditions {
Expand Down
76 changes: 72 additions & 4 deletions plugin/pkg/scheduler/factory/factory_test.go
Expand Up @@ -45,7 +45,7 @@ func TestCreate(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
factory := NewConfigFactory(client, nil)
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
factory.Create()
}

Expand All @@ -63,7 +63,7 @@ func TestCreateFromConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
factory := NewConfigFactory(client, nil)
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)

// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
factory := NewConfigFactory(client, nil)
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)

configData = []byte(`{}`)
err := latestschedulerapi.Codec.DecodeInto(configData, &policy)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), nil)
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), nil, api.DefaultSchedulerName)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
Expand Down Expand Up @@ -302,3 +302,71 @@ func TestBackoff(t *testing.T) {
t.Errorf("expected: 1, got %s", duration.String())
}
}

// TestResponsibleForPod tests if a pod with an annotation that should cause it to
// be picked up by the default scheduler, is in fact picked by the default scheduler
// Two schedulers are made in the test: one is default scheduler and other scheduler
// is of name "foo-scheduler". A pod must be picked up by at most one of the two
// schedulers.
func TestResponsibleForPod(t *testing.T) {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
// factory of "default-scheduler"
factoryDefaultScheduler := NewConfigFactory(client, nil, api.DefaultSchedulerName)
// factory of "foo-scheduler"
factoryFooScheduler := NewConfigFactory(client, nil, "foo-scheduler")
// scheduler annotaions to be tested
schedulerAnnotationFitsDefault := map[string]string{"scheduler.alpha.kubernetes.io/name": "default-scheduler"}
schedulerAnnotationFitsFoo := map[string]string{"scheduler.alpha.kubernetes.io/name": "foo-scheduler"}
schedulerAnnotationFitsNone := map[string]string{"scheduler.alpha.kubernetes.io/name": "bar-scheduler"}
tests := []struct {
pod *api.Pod
pickedByDefault bool
pickedByFoo bool
}{
{
// pod with no annotation "scheduler.alpha.kubernetes.io/name=<scheduler-name>" should be
// picked by the default scheduler, NOT by the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar"}},
pickedByDefault: true,
pickedByFoo: false,
},
{
// pod with annotation "scheduler.alpha.kubernetes.io/name=default-scheduler" should be picked
// by the scheduler of name "default-scheduler", NOT by the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar", Annotations: schedulerAnnotationFitsDefault}},
pickedByDefault: true,
pickedByFoo: false,
},
{
// pod with annotataion "scheduler.alpha.kubernetes.io/name=foo-scheduler" should be NOT
// be picked by the scheduler of name "default-scheduler", but by the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar", Annotations: schedulerAnnotationFitsFoo}},
pickedByDefault: false,
pickedByFoo: true,
},
{
// pod with annotataion "scheduler.alpha.kubernetes.io/name=foo-scheduler" should be NOT
// be picked by niether the scheduler of name "default-scheduler" nor the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar", Annotations: schedulerAnnotationFitsNone}},
pickedByDefault: false,
pickedByFoo: false,
},
}

for _, test := range tests {
podOfDefault := factoryDefaultScheduler.responsibleForPod(test.pod)
podOfFoo := factoryFooScheduler.responsibleForPod(test.pod)
results := []bool{podOfDefault, podOfFoo}
expected := []bool{test.pickedByDefault, test.pickedByFoo}
if !reflect.DeepEqual(results, expected) {
t.Errorf("expected: {%v, %v}, got {%v, %v}", test.pickedByDefault, test.pickedByFoo, podOfDefault, podOfFoo)
}
}
}
2 changes: 1 addition & 1 deletion test/component/scheduler/perf/util.go
Expand Up @@ -58,7 +58,7 @@ func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destro
Burst: 5000,
})

schedulerConfigFactory = factory.NewConfigFactory(c, nil)
schedulerConfigFactory = factory.NewConfigFactory(c, nil, api.DefaultSchedulerName)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
panic("Couldn't create scheduler config")
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/events.go
Expand Up @@ -91,7 +91,7 @@ var _ = Describe("Events", func() {
"involvedObject.kind": "Pod",
"involvedObject.uid": string(podWithUid.UID),
"involvedObject.namespace": framework.Namespace.Name,
"source": "scheduler",
"source": api.DefaultSchedulerName,
}.AsSelector()
options := api.ListOptions{FieldSelector: selector}
events, err := framework.Client.Events(framework.Namespace.Name).List(options)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/extender_test.go
Expand Up @@ -238,13 +238,13 @@ func TestSchedulerExtender(t *testing.T) {
}
policy.APIVersion = testapi.Default.GroupVersion().String()

schedulerConfigFactory := factory.NewConfigFactory(restClient, nil)
schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName)
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: api.DefaultSchedulerName})
eventBroadcaster.StartRecordingToSink(restClient.Events(""))
scheduler.New(schedulerConfig).Run()

Expand Down