Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make namespace parsing and informers pluggable #626

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cmd/mpi-operator/app/options/additional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package options

import (
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
)

type NamespaceParserFunc func(namespace string, kubeClient kubeclientset.Interface) ([]string, error)

type NamespaceOptions struct {
Namespaces NamespaceParserFunc
}

func DefaultNamespaceParser(namespace string, kubeClient kubeclientset.Interface) ([]string, error) {
return []string{namespace}, nil
}

type KubeInformerFunc func(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory
type MpiJobInformerFunc func(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory
type VolcanoInformerFunc func(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory
type SchedulerPluginsInformerFunc func(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory

type InformerOptions struct {
KubeInformer KubeInformerFunc
MpiJobInformer MpiJobInformerFunc
VolcanoInformer VolcanoInformerFunc
SchedulerPluginsInformer SchedulerPluginsInformerFunc
}
type AdditionalOptions struct {
NamespaceOptions
InformerOptions
}
11 changes: 11 additions & 0 deletions cmd/mpi-operator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package options

import (
"flag"
"github.com/kubeflow/mpi-operator/pkg/informers"
"os"

"github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
Expand All @@ -38,11 +39,21 @@ type ServerOption struct {
LockNamespace string
QPS int
Burst int

NamespaceOptions
InformerOptions
}

// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}

s.Namespaces = DefaultNamespaceParser
s.KubeInformer = informers.DefaultKubeInformer
s.MpiJobInformer = informers.DefaultMpiJobInformer
s.VolcanoInformer = informers.DefaultVolcanoInformer
s.SchedulerPluginsInformer = informers.DefaultSchedulerPluginsInformer

return &s
}

Expand Down
46 changes: 23 additions & 23 deletions cmd/mpi-operator/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
kubeapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -45,7 +44,6 @@ import (
"github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options"
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
kubeflowscheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
controllersv1 "github.com/kubeflow/mpi-operator/pkg/controller"
"github.com/kubeflow/mpi-operator/pkg/version"
)
Expand Down Expand Up @@ -82,13 +80,6 @@ func Run(opt *options.ServerOption) error {
version.PrintVersionAndExit(apiVersion)
}

namespace := opt.Namespace
if namespace == corev1.NamespaceAll {
klog.Info("Using cluster scoped operator")
} else {
klog.Infof("Scoping operator to namespace %s", namespace)
}

// To help debugging, immediately log version.
klog.Infof("%+v", version.Info(apiVersion))

Expand Down Expand Up @@ -118,9 +109,23 @@ func Run(opt *options.ServerOption) error {
if err != nil {
return err
}
if !checkCRDExists(mpiJobClientSet, namespace) {
klog.Info("CRD doesn't exist. Exiting")
os.Exit(1)

namespaces, err := opt.Namespaces(opt.Namespace, kubeClient)
if err != nil {
return err
}

if namespaces[0] == corev1.NamespaceAll {
klog.Info("Using cluster scoped operator")
} else {
klog.Infof("Scoping operator to namespace %s", namespaces)
}

for _, namespace := range namespaces {
if !checkCRDExists(mpiJobClientSet, namespace) {
klog.Info("CRD doesn't exist. Exiting")
os.Exit(1)
}
}

// Add mpi-job-controller types to the default Kubernetes Scheme so Events
Expand All @@ -132,14 +137,8 @@ func Run(opt *options.ServerOption) error {

// Set leader election start function.
run := func(ctx context.Context) {
var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption
var kubeflowInformerFactoryOpts []informers.SharedInformerOption
if namespace != metav1.NamespaceAll {
kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespace))
kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespace))
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...)
kubeInformerFactory := opt.KubeInformer(namespaces, kubeClient)
mpiJobInformerFactory := opt.MpiJobInformer(namespaces, mpiJobClientSet)

controller, err := controllersv1.NewMPIJobController(
kubeClient,
Expand All @@ -152,14 +151,15 @@ func Run(opt *options.ServerOption) error {
kubeInformerFactory.Batch().V1().Jobs(),
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
namespace, opt.GangSchedulingName)
mpiJobInformerFactory.Kubeflow().V2beta1().MPIJobs(),
opt.VolcanoInformer, opt.SchedulerPluginsInformer,
namespaces, opt.GangSchedulingName)
if err != nil {
klog.Fatalf("Failed to setup the controller")
}

go kubeInformerFactory.Start(ctx.Done())
go kubeflowInformerFactory.Start(ctx.Done())
go mpiJobInformerFactory.Start(ctx.Done())
if controller.PodGroupCtrl != nil {
controller.PodGroupCtrl.StartInformerFactory(ctx.Done())
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,13 @@ func NewMPIJobController(
podInformer coreinformers.PodInformer,
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
namespace, gangSchedulingName string) (*MPIJobController, error) {
volcanoInformerFunc options.VolcanoInformerFunc, schedulerPluginsInformerFunc options.SchedulerPluginsInformerFunc,
namespaces []string, gangSchedulingName string) (*MPIJobController, error) {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
priorityClassInformer, mpiJobInformer, &clock.RealClock{},
volcanoInformerFunc, schedulerPluginsInformerFunc,
namespaces, gangSchedulingName)
}

// NewMPIJobControllerWithClock returns a new MPIJob controller.
Expand All @@ -292,7 +295,8 @@ func NewMPIJobControllerWithClock(
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
clock clock.WithTicker,
namespace, gangSchedulingName string) (*MPIJobController, error) {
volcanoInformer options.VolcanoInformerFunc, schedulerPluginsInformer options.SchedulerPluginsInformerFunc,
namespaces []string, gangSchedulingName string) (*MPIJobController, error) {

// Create event broadcaster.
klog.V(4).Info("Creating event broadcaster")
Expand All @@ -311,10 +315,12 @@ func NewMPIJobControllerWithClock(
priorityClassLister = priorityClassInformer.Lister()
priorityClassSynced = priorityClassInformer.Informer().HasSynced
if gangSchedulingName == options.GangSchedulerVolcano {
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace, priorityClassLister)
volcanoInformer := volcanoInformer(namespaces, volcanoClient)
podGroupCtrl = NewVolcanoCtrl(volcanoClient, volcanoInformer, priorityClassLister)
} else if len(gangSchedulingName) != 0 {
// Use scheduler-plugins as a default gang-scheduler.
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister)
pgInformer := schedulerPluginsInformer(namespaces, schedClient)
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, pgInformer, gangSchedulingName, priorityClassLister)
}
if podGroupCtrl != nil {
podGroupSynced = podGroupCtrl.PodGroupSharedIndexInformer().HasSynced
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"fmt"
"github.com/kubeflow/mpi-operator/pkg/informers"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -46,7 +47,7 @@ import (
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
mpijobinformers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
)

var (
Expand Down Expand Up @@ -86,6 +87,8 @@ type fixture struct {
objects []runtime.Object

gangSchedulingName string

namespaces []string
}

func newFixture(t *testing.T, gangSchedulingName string) *fixture {
Expand All @@ -94,6 +97,7 @@ func newFixture(t *testing.T, gangSchedulingName string) *fixture {
f.objects = []runtime.Object{}
f.kubeObjects = []runtime.Object{}
f.gangSchedulingName = gangSchedulingName
f.namespaces = []string{metav1.NamespaceAll}
return f
}

Expand Down Expand Up @@ -155,11 +159,12 @@ func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.T
return mpiJob
}

func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, mpijobinformers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
f.client = fake.NewSimpleClientset(f.objects...)
f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...)
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc())

i := informers.DefaultMpiJobInformer(f.namespaces, f.client)
k8sI := informers.DefaultKubeInformer(f.namespaces, f.kubeClient)

c, err := NewMPIJobControllerWithClock(
f.kubeClient,
Expand All @@ -174,7 +179,8 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
k8sI.Scheduling().V1().PriorityClasses(),
i.Kubeflow().V2beta1().MPIJobs(),
clock,
metav1.NamespaceAll,
informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer,
f.namespaces,
f.gangSchedulingName,
)
if err != nil {
Expand Down
14 changes: 2 additions & 12 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ type VolcanoCtrl struct {
schedulerName string
}

func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
var informerFactoryOpts []volcanoinformers.SharedInformerOption
if watchNamespace != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(watchNamespace))
}
informerFactory := volcanoinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
func NewVolcanoCtrl(c volcanoclient.Interface, informerFactory volcanoinformers.SharedInformerFactory, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
return &VolcanoCtrl{
Client: c,
InformerFactory: informerFactory,
Expand Down Expand Up @@ -204,14 +199,9 @@ type SchedulerPluginsCtrl struct {

func NewSchedulerPluginsCtrl(
c schedclientset.Interface,
watchNamespace, schedulerName string,
pgInformerFactory schedinformers.SharedInformerFactory, schedulerName string,
pcLister schedulinglisters.PriorityClassLister,
) *SchedulerPluginsCtrl {
var informerFactoryOpts []schedinformers.SharedInformerOption
if watchNamespace != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(watchNamespace))
}
pgInformerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
return &SchedulerPluginsCtrl{
Client: c,
InformerFactory: pgInformerFactory,
Expand Down
47 changes: 47 additions & 0 deletions pkg/informers/informers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package informers

import (
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
)

func DefaultKubeInformer(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory {
var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespaces[0]))
}

return kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
}

func DefaultMpiJobInformer(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory {
var kubeflowInformerFactoryOpts []informers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespaces[0]))
}

return informers.NewSharedInformerFactoryWithOptions(mpiJobClient, 0, kubeflowInformerFactoryOpts...)
}

func DefaultVolcanoInformer(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory {
var informerFactoryOpts []volcanoinformers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(namespaces[0]))
}
return volcanoinformers.NewSharedInformerFactoryWithOptions(volcanoClient, 0, informerFactoryOpts...)
}

func DefaultSchedulerPluginsInformer(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory {
var informerFactoryOpts []schedinformers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(namespaces[0]))
}
return schedinformers.NewSharedInformerFactoryWithOptions(schedClient, 0, informerFactoryOpts...)
}
12 changes: 7 additions & 5 deletions test/integration/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration
import (
"context"
"fmt"
"github.com/kubeflow/mpi-operator/pkg/informers"
"testing"
"time"

Expand All @@ -29,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/reference"
"k8s.io/utils/pointer"
Expand All @@ -41,7 +41,6 @@ import (
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
"github.com/kubeflow/mpi-operator/pkg/controller"
)

Expand Down Expand Up @@ -828,8 +827,10 @@ func startController(
mpiClient clientset.Interface,
gangSchedulerCfg *gangSchedulerConfig,
) {
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kClient, 0)
mpiInformerFactory := informers.NewSharedInformerFactory(mpiClient, 0)
namespaces := []string{metav1.NamespaceAll}

kubeInformerFactory := informers.DefaultKubeInformer(namespaces, kClient)
mpiInformerFactory := informers.DefaultMpiJobInformer(namespaces, mpiClient)
var (
volcanoClient volcanoclient.Interface
schedClient schedclientset.Interface
Expand All @@ -855,7 +856,8 @@ func startController(
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(),
metav1.NamespaceAll, schedulerName,
informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer,
namespaces, schedulerName,
)
if err != nil {
panic(err)
Expand Down
Loading