diff --git a/cmd/mpi-operator/app/options/additional.go b/cmd/mpi-operator/app/options/additional.go new file mode 100644 index 00000000..5663daa4 --- /dev/null +++ b/cmd/mpi-operator/app/options/additional.go @@ -0,0 +1,38 @@ +package options + +import ( + mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" + kubeinformers "k8s.io/client-go/informers" + kubeclientset "k8s.io/client-go/kubernetes" + schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" + schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" + volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" + volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions" +) + +type NamespaceParserFunc func(namespace string, kubeClient kubeclientset.Interface) ([]string, error) + +type NamespaceOptions struct { + Namespaces NamespaceParserFunc +} + +func DefaultNamespaceParser(namespace string, kubeClient kubeclientset.Interface) ([]string, error) { + return []string{namespace}, nil +} + +type KubeInformerFunc func(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory +type MpiJobInformerFunc func(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory +type VolcanoInformerFunc func(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory +type SchedulerPluginsInformerFunc func(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory + +type InformerOptions struct { + KubeInformer KubeInformerFunc + MpiJobInformer MpiJobInformerFunc + VolcanoInformer VolcanoInformerFunc + SchedulerPluginsInformer SchedulerPluginsInformerFunc +} +type AdditionalOptions struct { + NamespaceOptions + InformerOptions +} diff --git a/cmd/mpi-operator/app/options/options.go b/cmd/mpi-operator/app/options/options.go index 432e914a..1bf7134f 100644 --- a/cmd/mpi-operator/app/options/options.go +++ b/cmd/mpi-operator/app/options/options.go @@ -16,6 +16,7 @@ package options import ( "flag" + "github.com/kubeflow/mpi-operator/pkg/informers" "os" "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" @@ -38,11 +39,21 @@ type ServerOption struct { LockNamespace string QPS int Burst int + + NamespaceOptions + InformerOptions } // NewServerOption creates a new CMServer with a default config. func NewServerOption() *ServerOption { s := ServerOption{} + + s.Namespaces = DefaultNamespaceParser + s.KubeInformer = informers.DefaultKubeInformer + s.MpiJobInformer = informers.DefaultMpiJobInformer + s.VolcanoInformer = informers.DefaultVolcanoInformer + s.SchedulerPluginsInformer = informers.DefaultSchedulerPluginsInformer + return &s } diff --git a/cmd/mpi-operator/app/server.go b/cmd/mpi-operator/app/server.go index 4024ca63..d3f96547 100644 --- a/cmd/mpi-operator/app/server.go +++ b/cmd/mpi-operator/app/server.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" kubeapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" - kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" clientgokubescheme "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -45,7 +44,6 @@ import ( "github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options" mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" kubeflowscheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme" - informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" controllersv1 "github.com/kubeflow/mpi-operator/pkg/controller" "github.com/kubeflow/mpi-operator/pkg/version" ) @@ -82,13 +80,6 @@ func Run(opt *options.ServerOption) error { version.PrintVersionAndExit(apiVersion) } - namespace := opt.Namespace - if namespace == corev1.NamespaceAll { - klog.Info("Using cluster scoped operator") - } else { - klog.Infof("Scoping operator to namespace %s", namespace) - } - // To help debugging, immediately log version. klog.Infof("%+v", version.Info(apiVersion)) @@ -118,9 +109,23 @@ func Run(opt *options.ServerOption) error { if err != nil { return err } - if !checkCRDExists(mpiJobClientSet, namespace) { - klog.Info("CRD doesn't exist. Exiting") - os.Exit(1) + + namespaces, err := opt.Namespaces(opt.Namespace, kubeClient) + if err != nil { + return err + } + + if namespaces[0] == corev1.NamespaceAll { + klog.Info("Using cluster scoped operator") + } else { + klog.Infof("Scoping operator to namespace %s", namespaces) + } + + for _, namespace := range namespaces { + if !checkCRDExists(mpiJobClientSet, namespace) { + klog.Info("CRD doesn't exist. Exiting") + os.Exit(1) + } } // Add mpi-job-controller types to the default Kubernetes Scheme so Events @@ -132,14 +137,8 @@ func Run(opt *options.ServerOption) error { // Set leader election start function. run := func(ctx context.Context) { - var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption - var kubeflowInformerFactoryOpts []informers.SharedInformerOption - if namespace != metav1.NamespaceAll { - kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespace)) - kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespace)) - } - kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...) - kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...) + kubeInformerFactory := opt.KubeInformer(namespaces, kubeClient) + mpiJobInformerFactory := opt.MpiJobInformer(namespaces, mpiJobClientSet) controller, err := controllersv1.NewMPIJobController( kubeClient, @@ -152,14 +151,15 @@ func Run(opt *options.ServerOption) error { kubeInformerFactory.Batch().V1().Jobs(), kubeInformerFactory.Core().V1().Pods(), kubeInformerFactory.Scheduling().V1().PriorityClasses(), - kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(), - namespace, opt.GangSchedulingName) + mpiJobInformerFactory.Kubeflow().V2beta1().MPIJobs(), + opt.VolcanoInformer, opt.SchedulerPluginsInformer, + namespaces, opt.GangSchedulingName) if err != nil { klog.Fatalf("Failed to setup the controller") } go kubeInformerFactory.Start(ctx.Done()) - go kubeflowInformerFactory.Start(ctx.Done()) + go mpiJobInformerFactory.Start(ctx.Done()) if controller.PodGroupCtrl != nil { controller.PodGroupCtrl.StartInformerFactory(ctx.Done()) } diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index c161de9e..2b49b31d 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -272,10 +272,13 @@ func NewMPIJobController( podInformer coreinformers.PodInformer, priorityClassInformer schedulinginformers.PriorityClassInformer, mpiJobInformer informers.MPIJobInformer, - namespace, gangSchedulingName string) (*MPIJobController, error) { + volcanoInformerFunc options.VolcanoInformerFunc, schedulerPluginsInformerFunc options.SchedulerPluginsInformerFunc, + namespaces []string, gangSchedulingName string) (*MPIJobController, error) { return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient, configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer, - priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName) + priorityClassInformer, mpiJobInformer, &clock.RealClock{}, + volcanoInformerFunc, schedulerPluginsInformerFunc, + namespaces, gangSchedulingName) } // NewMPIJobControllerWithClock returns a new MPIJob controller. @@ -292,7 +295,8 @@ func NewMPIJobControllerWithClock( priorityClassInformer schedulinginformers.PriorityClassInformer, mpiJobInformer informers.MPIJobInformer, clock clock.WithTicker, - namespace, gangSchedulingName string) (*MPIJobController, error) { + volcanoInformer options.VolcanoInformerFunc, schedulerPluginsInformer options.SchedulerPluginsInformerFunc, + namespaces []string, gangSchedulingName string) (*MPIJobController, error) { // Create event broadcaster. klog.V(4).Info("Creating event broadcaster") @@ -311,10 +315,12 @@ func NewMPIJobControllerWithClock( priorityClassLister = priorityClassInformer.Lister() priorityClassSynced = priorityClassInformer.Informer().HasSynced if gangSchedulingName == options.GangSchedulerVolcano { - podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace, priorityClassLister) + volcanoInformer := volcanoInformer(namespaces, volcanoClient) + podGroupCtrl = NewVolcanoCtrl(volcanoClient, volcanoInformer, priorityClassLister) } else if len(gangSchedulingName) != 0 { // Use scheduler-plugins as a default gang-scheduler. - podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister) + pgInformer := schedulerPluginsInformer(namespaces, schedClient) + podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, pgInformer, gangSchedulingName, priorityClassLister) } if podGroupCtrl != nil { podGroupSynced = podGroupCtrl.PodGroupSharedIndexInformer().HasSynced diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 51c6a8e7..656fa555 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -16,6 +16,7 @@ package controller import ( "fmt" + "github.com/kubeflow/mpi-operator/pkg/informers" "reflect" "testing" "time" @@ -46,7 +47,7 @@ import ( kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake" "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme" - informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" + mpijobinformers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" ) var ( @@ -86,6 +87,8 @@ type fixture struct { objects []runtime.Object gangSchedulingName string + + namespaces []string } func newFixture(t *testing.T, gangSchedulingName string) *fixture { @@ -94,6 +97,7 @@ func newFixture(t *testing.T, gangSchedulingName string) *fixture { f.objects = []runtime.Object{} f.kubeObjects = []runtime.Object{} f.gangSchedulingName = gangSchedulingName + f.namespaces = []string{metav1.NamespaceAll} return f } @@ -155,11 +159,12 @@ func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.T return mpiJob } -func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, mpijobinformers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { f.client = fake.NewSimpleClientset(f.objects...) f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) - i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc()) - k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc()) + + i := informers.DefaultMpiJobInformer(f.namespaces, f.client) + k8sI := informers.DefaultKubeInformer(f.namespaces, f.kubeClient) c, err := NewMPIJobControllerWithClock( f.kubeClient, @@ -174,7 +179,8 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info k8sI.Scheduling().V1().PriorityClasses(), i.Kubeflow().V2beta1().MPIJobs(), clock, - metav1.NamespaceAll, + informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer, + f.namespaces, f.gangSchedulingName, ) if err != nil { diff --git a/pkg/controller/podgroup.go b/pkg/controller/podgroup.go index c01d691b..30aeac40 100644 --- a/pkg/controller/podgroup.go +++ b/pkg/controller/podgroup.go @@ -73,12 +73,7 @@ type VolcanoCtrl struct { schedulerName string } -func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl { - var informerFactoryOpts []volcanoinformers.SharedInformerOption - if watchNamespace != metav1.NamespaceAll { - informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(watchNamespace)) - } - informerFactory := volcanoinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...) +func NewVolcanoCtrl(c volcanoclient.Interface, informerFactory volcanoinformers.SharedInformerFactory, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl { return &VolcanoCtrl{ Client: c, InformerFactory: informerFactory, @@ -204,14 +199,9 @@ type SchedulerPluginsCtrl struct { func NewSchedulerPluginsCtrl( c schedclientset.Interface, - watchNamespace, schedulerName string, + pgInformerFactory schedinformers.SharedInformerFactory, schedulerName string, pcLister schedulinglisters.PriorityClassLister, ) *SchedulerPluginsCtrl { - var informerFactoryOpts []schedinformers.SharedInformerOption - if watchNamespace != metav1.NamespaceAll { - informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(watchNamespace)) - } - pgInformerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...) return &SchedulerPluginsCtrl{ Client: c, InformerFactory: pgInformerFactory, diff --git a/pkg/informers/informers.go b/pkg/informers/informers.go new file mode 100644 index 00000000..6983970a --- /dev/null +++ b/pkg/informers/informers.go @@ -0,0 +1,47 @@ +package informers + +import ( + mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + kubeclientset "k8s.io/client-go/kubernetes" + schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" + schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" + volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" + volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions" +) + +func DefaultKubeInformer(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory { + var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption + if namespaces[0] != metav1.NamespaceAll { + kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespaces[0])) + } + + return kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...) +} + +func DefaultMpiJobInformer(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory { + var kubeflowInformerFactoryOpts []informers.SharedInformerOption + if namespaces[0] != metav1.NamespaceAll { + kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespaces[0])) + } + + return informers.NewSharedInformerFactoryWithOptions(mpiJobClient, 0, kubeflowInformerFactoryOpts...) +} + +func DefaultVolcanoInformer(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory { + var informerFactoryOpts []volcanoinformers.SharedInformerOption + if namespaces[0] != metav1.NamespaceAll { + informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(namespaces[0])) + } + return volcanoinformers.NewSharedInformerFactoryWithOptions(volcanoClient, 0, informerFactoryOpts...) +} + +func DefaultSchedulerPluginsInformer(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory { + var informerFactoryOpts []schedinformers.SharedInformerOption + if namespaces[0] != metav1.NamespaceAll { + informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(namespaces[0])) + } + return schedinformers.NewSharedInformerFactoryWithOptions(schedClient, 0, informerFactoryOpts...) +} diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 87c7eaf6..b5018b08 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -17,6 +17,7 @@ package integration import ( "context" "fmt" + "github.com/kubeflow/mpi-operator/pkg/informers" "testing" "time" @@ -29,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/reference" "k8s.io/utils/pointer" @@ -41,7 +41,6 @@ import ( kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme" - informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" "github.com/kubeflow/mpi-operator/pkg/controller" ) @@ -828,8 +827,10 @@ func startController( mpiClient clientset.Interface, gangSchedulerCfg *gangSchedulerConfig, ) { - kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kClient, 0) - mpiInformerFactory := informers.NewSharedInformerFactory(mpiClient, 0) + namespaces := []string{metav1.NamespaceAll} + + kubeInformerFactory := informers.DefaultKubeInformer(namespaces, kClient) + mpiInformerFactory := informers.DefaultMpiJobInformer(namespaces, mpiClient) var ( volcanoClient volcanoclient.Interface schedClient schedclientset.Interface @@ -855,7 +856,8 @@ func startController( kubeInformerFactory.Core().V1().Pods(), kubeInformerFactory.Scheduling().V1().PriorityClasses(), mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(), - metav1.NamespaceAll, schedulerName, + informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer, + namespaces, schedulerName, ) if err != nil { panic(err)