Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace kubeclient with kubeclientset in scheduler factory #34084

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions plugin/cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -88,7 +88,6 @@ func Run(s *options.SchedulerServer) error {
kubeconfig.QPS = s.KubeAPIQPS
kubeconfig.Burst = int(s.KubeAPIBurst)

kubeClient, err := client.New(kubeconfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
Expand All @@ -115,7 +114,7 @@ func Run(s *options.SchedulerServer) error {
glog.Fatal(server.ListenAndServe())
}()

configFactory := factory.NewConfigFactory(kubeClient, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
configFactory := factory.NewConfigFactory(leaderElectionClient, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do, still did not spent much time on removing it completely. Once all tests pass I can refactor the code more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the core clientset provides the Events functions so the client can be removed completely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, unversioned client (1) differs from clientset core client (2). Reverting the change.

  1. EventInterface.Patch(event *api.Event, data []byte) (*api.Event, error)
  2. EventInterface.Patch(name string, pt api.PatchType, data []byte, subresources ...string) (result *api.Event, err error)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caesarxuchao looks exactly like the solution I am looking for. Thanks!!! Will update the PR and remove the unversioned.Client.

config, err := createConfig(s, configFactory)

if err != nil {
Expand All @@ -125,7 +124,7 @@ func Run(s *options.SchedulerServer) error {
eventBroadcaster := record.NewBroadcaster()
config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: s.SchedulerName})
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: leaderElectionClient.Core().Events("")})

sched := scheduler.New(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})

if _, err := factory.NewConfigFactory(client, "some-scheduler-name", api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains).CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
Expand Down
32 changes: 16 additions & 16 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime"
Expand All @@ -53,7 +53,7 @@ const (

// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
Client clientset.Interface
// queue for pods that need scheduling
PodQueue *cache.FIFO
// a means to list all known scheduled pods.
Expand Down Expand Up @@ -96,7 +96,7 @@ type ConfigFactory struct {
}

// Initializes the factory.
func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory {
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)

Expand Down Expand Up @@ -478,48 +478,48 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
// scheduled.
func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch {
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed))
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, selector)
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "pods", api.NamespaceAll, selector)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just build ListWatch function with factory.Client.Core().Pod(api.NamespaceAll).List(selecor) and factory.Client.Core().Pod(api.NamespaceAll).Watch(selecor)? Building ListWatch func atop of RESTClient extracted from a client that's already able to list and watch looks wrong to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that. Though, scheduler is not the only occurrence of the phenomenon. There are other places in kubelet, apiserver, proxy, etc. that are subjects to the replacement. I would suggest to open separate PR for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. That's fair. Could you open an issue to track it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you are calling GetRESTClient() in your other PRs as well. The clientset really shouldn't expose the RESTClient. It was a hack to get the rate limiter. I don't want to encourage more uses of GetRESTClient(). Could you fix the NewListWatchFromClient in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why not build a function like that is the go type system makes it impossible to make a general interface, since the specific X/XList types are different for each object. :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caesarxuchao I think the best option, if you don't want people to break the glass, is to add a NewListWatch function to each client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why not build a function like that is the go type system makes it impossible to make a general interface

Right. I missed it.

@ingvagabund mostly looks good. I left a comment regarding the Event methods. Let me know if you need more help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caesarxuchao I think the best option, if you don't want people to break the glass, is to add a NewListWatch function to each client.

I like this approach. That could be done as another PR on top or as a part of #34138.

}

// Returns a cache.ListWatch that finds all pods that are
// already scheduled.
// TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch {
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed))
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, selector)
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "pods", api.NamespaceAll, selector)
}

// createNodeLW returns a cache.ListWatch that gets all changes to nodes.
func (factory *ConfigFactory) createNodeLW() *cache.ListWatch {
// all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups
// the NodeCondition is used to filter out the nodes that are not ready or unschedulable
// the filtered list is used as the super set of nodes to consider for scheduling
return cache.NewListWatchFromClient(factory.Client, "nodes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "nodes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

// createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes.
func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "persistentVolumes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "persistentVolumes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

// createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims.
func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "persistentVolumeClaims", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "persistentVolumeClaims", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

// Returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "services", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "services", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

// Returns a cache.ListWatch that gets all changes to controllers.
func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

// Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.ExtensionsClient, "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Extensions().GetRESTClient(), "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}

func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
Expand Down Expand Up @@ -547,7 +547,7 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
for {
pod, err := factory.Client.Pods(podID.Namespace).Get(podID.Name)
pod, err := factory.Client.Core().Pods(podID.Namespace).Get(podID.Name)
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddIfNotPresent(pod)
Expand Down Expand Up @@ -587,26 +587,26 @@ func (ne *nodeEnumerator) Get(index int) interface{} {
}

type binder struct {
*client.Client
Client clientset.Interface
}

// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *api.Binding) error {
glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
return b.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
return b.Client.Core().GetRESTClient().Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
// TODO: use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
}

type podConditionUpdater struct {
*client.Client
Client clientset.Interface
}

func (p *podConditionUpdater) Update(pod *api.Pod, condition *api.PodCondition) error {
glog.V(2).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
if api.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Pods(pod.Namespace).UpdateStatus(pod)
_, err := p.Client.Core().Pods(pod.Namespace).UpdateStatus(pod)
return err
}
return nil
Expand Down
18 changes: 9 additions & 9 deletions plugin/pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
Expand All @@ -47,7 +47,7 @@ func TestCreate(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
factory := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
factory.Create()
}
Expand All @@ -65,7 +65,7 @@ func TestCreateFromConfig(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
factory := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)

// Pre-register some predicate and priority functions
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
factory := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)

configData = []byte(`{}`)
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := NewConfigFactory(client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}), api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}), api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestBind(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
b := binder{client}

if err := b.Bind(item.binding); err != nil {
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestResponsibleForPod(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
// factory of "default-scheduler"
factoryDefaultScheduler := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
// factory of "foo-scheduler"
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
server := httptest.NewServer(&handler)
// TODO: Uncomment when fix #19254
// defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
// factory of "default-scheduler"
factory := NewConfigFactory(client, api.DefaultSchedulerName, -1, api.DefaultFailureDomains)
_, err := factory.Create()
Expand All @@ -398,7 +398,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})

testCases := []struct {
hardPodAffinitySymmetricWeight int
Expand Down
7 changes: 5 additions & 2 deletions test/integration/scheduler/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
Expand Down Expand Up @@ -196,6 +198,7 @@ func TestSchedulerExtender(t *testing.T) {
defer framework.DeleteTestingNamespace(ns, s, t)

restClient := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})

extender1 := &Extender{
name: "extender1",
Expand Down Expand Up @@ -237,14 +240,14 @@ func TestSchedulerExtender(t *testing.T) {
}
policy.APIVersion = registered.GroupOrDie(api.GroupName).GroupVersion.String()

schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
schedulerConfigFactory := factory.NewConfigFactory(clientSet, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: api.DefaultSchedulerName})
eventBroadcaster.StartRecordingToSink(restClient.Events(ns.Name))
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: clientSet.Core().Events("")})
scheduler.New(schedulerConfig).Run()

defer close(schedulerConfig.StopEverything)
Expand Down