Skip to content

Commit

Permalink
Fix a bug that the PodGroupCtrl can not list priorityclass (#561)
Browse files Browse the repository at this point in the history
* Fix a potentially null pointer error

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>

* Fix a bug that the PodGroupCtrl can not list priorityclass

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>

* Refactoring setups for gang-scheduling

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>

---------

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Jun 8, 2023
1 parent 2da8e05 commit fda0532
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 90 deletions.
27 changes: 7 additions & 20 deletions cmd/mpi-operator/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
kubeapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
kubeinformers "k8s.io/client-go/informers"
schedulinginformers "k8s.io/client-go/informers/scheduling/v1"
kubeclientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -142,36 +141,24 @@ func Run(opt *options.ServerOption) error {
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...)

// For the gang scheduling
var podGroupCtrl controllersv1.PodGroupControl
if opt.GangSchedulingName == options.GangSchedulerVolcano {
podGroupCtrl = controllersv1.NewVolcanoCtrl(volcanoClientSet, namespace)
} else if len(opt.GangSchedulingName) != 0 {
// Use scheduler-plugins as a default gang-scheduler.

podGroupCtrl = controllersv1.NewSchedulerPluginsCtrl(schedClientSet, namespace, opt.GangSchedulingName)
}
var priorityClassInformer schedulinginformers.PriorityClassInformer
if podGroupCtrl != nil {
priorityClassInformer = kubeInformerFactory.Scheduling().V1().PriorityClasses()
}

controller := controllersv1.NewMPIJobController(
kubeClient,
mpiJobClientSet,
podGroupCtrl,
volcanoClientSet,
schedClientSet,
kubeInformerFactory.Core().V1().ConfigMaps(),
kubeInformerFactory.Core().V1().Secrets(),
kubeInformerFactory.Core().V1().Services(),
kubeInformerFactory.Batch().V1().Jobs(),
kubeInformerFactory.Core().V1().Pods(),
priorityClassInformer,
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs())
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
namespace, opt.GangSchedulingName)

go kubeInformerFactory.Start(ctx.Done())
go kubeflowInformerFactory.Start(ctx.Done())
if podGroupCtrl != nil {
podGroupCtrl.StartInformerFactory(ctx.Done())
if controller.PodGroupCtrl != nil {
controller.PodGroupCtrl.StartInformerFactory(ctx.Done())
}

// Set leader election start function.
Expand Down
67 changes: 41 additions & 26 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ import (
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

"github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options"
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/validation"
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -211,8 +214,8 @@ type MPIJobController struct {
kubeClient kubernetes.Interface
// kubeflowClient is a clientset for our own API group.
kubeflowClient clientset.Interface
// podGroupCtrl is a client for PodGroups (volcano and scheduler-plugins).
podGroupCtrl PodGroupControl
// PodGroupCtrl is a client for PodGroups (volcano and scheduler-plugins).
PodGroupCtrl PodGroupControl

configMapLister corelisters.ConfigMapLister
configMapSynced cache.InformerSynced
Expand Down Expand Up @@ -251,32 +254,36 @@ type MPIJobController struct {
func NewMPIJobController(
kubeClient kubernetes.Interface,
kubeflowClient clientset.Interface,
podGroupCtrl PodGroupControl,
volcanoClient volcanoclient.Interface,
schedClient schedclientset.Interface,
configMapInformer coreinformers.ConfigMapInformer,
secretInformer coreinformers.SecretInformer,
serviceInformer coreinformers.ServiceInformer,
jobInformer batchinformers.JobInformer,
podInformer coreinformers.PodInformer,
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer) *MPIJobController {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, podGroupCtrl,
configMapInformer, secretInformer, serviceInformer, jobInformer,
podInformer, priorityClassInformer, mpiJobInformer, &clock.RealClock{})
mpiJobInformer informers.MPIJobInformer,
namespace, gangSchedulingName string) *MPIJobController {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
}

// NewMPIJobControllerWithClock returns a new MPIJob controller.
func NewMPIJobControllerWithClock(
kubeClient kubernetes.Interface,
kubeflowClient clientset.Interface,
podGroupCtrl PodGroupControl,
volcanoClient volcanoclient.Interface,
schedClient schedclientset.Interface,
configMapInformer coreinformers.ConfigMapInformer,
secretInformer coreinformers.SecretInformer,
serviceInformer coreinformers.ServiceInformer,
jobInformer batchinformers.JobInformer,
podInformer coreinformers.PodInformer,
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
clock clock.WithTicker) *MPIJobController {
clock clock.WithTicker,
namespace, gangSchedulingName string) *MPIJobController {

// Create event broadcaster.
klog.V(4).Info("Creating event broadcaster")
Expand All @@ -285,21 +292,29 @@ func NewMPIJobControllerWithClock(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

// For the gang scheduling.
var (
podGroupCtrl PodGroupControl
podGroupSynced cache.InformerSynced
priorityClassLister schedulinglisters.PriorityClassLister
priorityClassSynced cache.InformerSynced
)
if podGroupCtrl != nil {
podGroupSynced = podGroupCtrl.PodGroupSharedIndexInformer().HasSynced
if gangSchedulingName == options.GangSchedulerVolcano {
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace)
} else if len(gangSchedulingName) != 0 {
// Use scheduler-plugins as a default gang-scheduler.
priorityClassLister = priorityClassInformer.Lister()
priorityClassSynced = priorityClassInformer.Informer().HasSynced
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister)
}
if podGroupCtrl != nil {
podGroupSynced = podGroupCtrl.PodGroupSharedIndexInformer().HasSynced
}

controller := &MPIJobController{
kubeClient: kubeClient,
kubeflowClient: kubeflowClient,
podGroupCtrl: podGroupCtrl,
PodGroupCtrl: podGroupCtrl,
configMapLister: configMapInformer.Lister(),
configMapSynced: configMapInformer.Informer().HasSynced,
secretLister: secretInformer.Lister(),
Expand Down Expand Up @@ -398,7 +413,7 @@ func (c *MPIJobController) Run(threadiness int, stopCh <-chan struct{}) error {
c.podSynced,
c.mpiJobSynced,
}
if c.podGroupCtrl != nil {
if c.PodGroupCtrl != nil {
synced = append(synced, c.podGroupSynced, c.priorityClassSynced)
}
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
Expand Down Expand Up @@ -578,7 +593,7 @@ func (c *MPIJobController) syncHandler(key string) error {

if !isMPIJobSuspended(mpiJob) {
// Get the PodGroup for this MPIJob
if c.podGroupCtrl != nil {
if c.PodGroupCtrl != nil {
if podGroup, err := c.getOrCreatePodGroups(mpiJob); podGroup == nil || err != nil {
return err
}
Expand Down Expand Up @@ -638,7 +653,7 @@ func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
return err
}
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
if c.podGroupCtrl != nil {
if c.PodGroupCtrl != nil {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
Expand Down Expand Up @@ -673,11 +688,11 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job

// getOrCreatePodGroups will create a PodGroup for gang scheduling by volcano.
func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (metav1.Object, error) {
newPodGroup := c.podGroupCtrl.newPodGroup(mpiJob)
podGroup, err := c.podGroupCtrl.getPodGroup(newPodGroup.GetNamespace(), newPodGroup.GetName())
newPodGroup := c.PodGroupCtrl.newPodGroup(mpiJob)
podGroup, err := c.PodGroupCtrl.getPodGroup(newPodGroup.GetNamespace(), newPodGroup.GetName())
// If the PodGroup doesn't exist, we'll create it.
if errors.IsNotFound(err) {
return c.podGroupCtrl.createPodGroup(context.TODO(), newPodGroup)
return c.PodGroupCtrl.createPodGroup(context.TODO(), newPodGroup)
}
// If an error occurs during Get/Create, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
Expand All @@ -693,15 +708,15 @@ func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (metav1
return nil, fmt.Errorf(msg)
}

if !c.podGroupCtrl.pgSpecsAreEqual(podGroup, newPodGroup) {
return c.podGroupCtrl.updatePodGroup(context.TODO(), podGroup, newPodGroup)
if !c.PodGroupCtrl.pgSpecsAreEqual(podGroup, newPodGroup) {
return c.PodGroupCtrl.updatePodGroup(context.TODO(), podGroup, newPodGroup)
}
return podGroup, nil
}

// deletePodGroups will delete a PodGroup when MPIJob have done.
func (c *MPIJobController) deletePodGroups(mpiJob *kubeflow.MPIJob) error {
podGroup, err := c.podGroupCtrl.getPodGroup(mpiJob.Namespace, mpiJob.Name)
podGroup, err := c.PodGroupCtrl.getPodGroup(mpiJob.Namespace, mpiJob.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil
Expand All @@ -718,7 +733,7 @@ func (c *MPIJobController) deletePodGroups(mpiJob *kubeflow.MPIJob) error {
}

// If the PodGroup exist, we'll delete it.
err = c.podGroupCtrl.deletePodGroup(context.TODO(), mpiJob.Namespace, mpiJob.Name)
err = c.PodGroupCtrl.deletePodGroup(context.TODO(), mpiJob.Namespace, mpiJob.Name)
// If an error occurs during Delete, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
Expand Down Expand Up @@ -1345,8 +1360,8 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
c.setupSSHOnPod(&podTemplate.Spec, mpiJob)

// add SchedulerName to podSpec
if c.podGroupCtrl != nil {
c.podGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name)
if c.PodGroupCtrl != nil {
c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name)
}

return &corev1.Pod{
Expand Down Expand Up @@ -1403,8 +1418,8 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev
podTemplate.Labels[key] = value
}
// add SchedulerName to podSpec
if c.podGroupCtrl != nil {
c.podGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name)
if c.PodGroupCtrl != nil {
c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name)
}
podTemplate.Spec.Hostname = launcherName
podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name.
Expand Down
30 changes: 10 additions & 20 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
kubeinformers "k8s.io/client-go/informers"
schedulinginformers "k8s.io/client-go/informers/scheduling/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
Expand All @@ -45,7 +44,6 @@ import (
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanofake "volcano.sh/apis/pkg/client/clientset/versioned/fake"

"github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options"
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -164,29 +162,21 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc())

var podGroupCtrl PodGroupControl
if f.gangSchedulingName == options.GangSchedulerVolcano {
podGroupCtrl = NewVolcanoCtrl(f.volcanoClient, metav1.NamespaceAll)
} else if len(f.gangSchedulingName) != 0 {
podGroupCtrl = NewSchedulerPluginsCtrl(f.schedClient, metav1.NamespaceAll, "default-scheduler")
}
var priorityClassInformer schedulinginformers.PriorityClassInformer
if podGroupCtrl != nil {
priorityClassInformer = k8sI.Scheduling().V1().PriorityClasses()
}

c := NewMPIJobControllerWithClock(
f.kubeClient,
f.client,
podGroupCtrl,
f.volcanoClient,
f.schedClient,
k8sI.Core().V1().ConfigMaps(),
k8sI.Core().V1().Secrets(),
k8sI.Core().V1().Services(),
k8sI.Batch().V1().Jobs(),
k8sI.Core().V1().Pods(),
priorityClassInformer,
k8sI.Scheduling().V1().PriorityClasses(),
i.Kubeflow().V2beta1().MPIJobs(),
clock,
metav1.NamespaceAll,
f.gangSchedulingName,
)

c.configMapSynced = alwaysReady
Expand Down Expand Up @@ -232,15 +222,15 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
}
}

if podGroupCtrl != nil {
if c.PodGroupCtrl != nil {
for _, podGroup := range f.volcanoPodGroupLister {
err := podGroupCtrl.PodGroupSharedIndexInformer().GetIndexer().Add(podGroup)
err := c.PodGroupCtrl.PodGroupSharedIndexInformer().GetIndexer().Add(podGroup)
if err != nil {
fmt.Println("Failed to create volcano pod group")
}
}
for _, podGroup := range f.schedPodGroupLister {
err := podGroupCtrl.PodGroupSharedIndexInformer().GetIndexer().Add(podGroup)
err := c.PodGroupCtrl.PodGroupSharedIndexInformer().GetIndexer().Add(podGroup)
if err != nil {
fmt.Println("Failed to create scheduler-plugins pod group")
}
Expand Down Expand Up @@ -282,8 +272,8 @@ func (f *fixture) runController(mpiJobName string, startInformers, expectError b
defer close(stopCh)
i.Start(stopCh)
k8sI.Start(stopCh)
if c.podGroupCtrl != nil {
c.podGroupCtrl.StartInformerFactory(stopCh)
if c.PodGroupCtrl != nil {
c.PodGroupCtrl.StartInformerFactory(stopCh)
}
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,22 @@ type SchedulerPluginsCtrl struct {
schedulerName string
}

func NewSchedulerPluginsCtrl(c schedclientset.Interface, watchNamespace, schedulerName string) *SchedulerPluginsCtrl {
func NewSchedulerPluginsCtrl(
c schedclientset.Interface,
watchNamespace, schedulerName string,
pcLister schedulinglisters.PriorityClassLister,
) *SchedulerPluginsCtrl {
var informerFactoryOpts []schedinformers.SharedInformerOption
if watchNamespace != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(watchNamespace))
}
informerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
pgInformerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
return &SchedulerPluginsCtrl{
Client: c,
InformerFactory: informerFactory,
PodGroupInformer: informerFactory.Scheduling().V1alpha1().PodGroups(),
schedulerName: schedulerName,
Client: c,
InformerFactory: pgInformerFactory,
PodGroupInformer: pgInformerFactory.Scheduling().V1alpha1().PodGroups(),
PriorityClassLister: pcLister,
schedulerName: schedulerName,
}
}

Expand Down Expand Up @@ -231,6 +236,10 @@ func (s *SchedulerPluginsCtrl) newPodGroup(mpiJob *kubeflow.MPIJob) metav1.Objec
scheduleTimeoutSec = schedPolicy.ScheduleTimeoutSeconds
}
minMember := calculateMinAvailable(mpiJob)
var minResources corev1.ResourceList
if origin := s.calculatePGMinResources(minMember, mpiJob); origin != nil {
minResources = *origin
}
return &schedv1alpha1.PodGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: schedv1alpha1.SchemeGroupVersion.String(),
Expand All @@ -246,7 +255,7 @@ func (s *SchedulerPluginsCtrl) newPodGroup(mpiJob *kubeflow.MPIJob) metav1.Objec
Spec: schedv1alpha1.PodGroupSpec{
MinMember: *minMember,
ScheduleTimeoutSeconds: scheduleTimeoutSec,
MinResources: *s.calculatePGMinResources(minMember, mpiJob),
MinResources: minResources,
},
}
}
Expand Down
Loading

0 comments on commit fda0532

Please sign in to comment.