Skip to content

Commit

Permalink
Add PodGroup as controller watch source (#1666)
Browse files Browse the repository at this point in the history
* Add PodGroup as controller watch source

The common.ReconcileJobs stops creating pods when the related
PodGroup is unschedulable. When the PodGroup becomes schedulable,
the reconcile loop can not be triggered because of no watch source
for the PodGroup.

Signed-off-by: Peng Gao <peng.gao.dut@gmail.com>

* Fix the no PodGroup kind error

Continue without watching the PodGroup is the PodGroup
is not installed.

Signed-off-by: Peng Gao <peng.gao.dut@gmail.com>

* Remove the PodGroup crd

Signed-off-by: Peng Gao <peng.gao.dut@gmail.com>

* Remove extra blank lines

Signed-off-by: Peng Gao <peng.gao.dut@gmail.com>

Signed-off-by: Peng Gao <peng.gao.dut@gmail.com>
  • Loading branch information
ggaaooppeenngg committed Oct 20, 2022
1 parent e5f372f commit ab9f3ec
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/training-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

commonutil "github.com/kubeflow/common/pkg/util"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand All @@ -45,6 +46,7 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kubeflowv1.AddToScheme(scheme))
utilruntime.Must(v1beta1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand Down Expand Up @@ -242,6 +243,19 @@ func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}

// skip watching podgroup if PodGroup is not installed
_, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"},
v1beta1.SchemeGroupVersion.Version)
if err == nil {
// inject watching for job related PodGroup
if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.MPIJob{},
}, predicates); err != nil {
return err
}
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/controller.v1/mpi/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -65,6 +67,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

err = v1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = kubeflowv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

Expand Down
18 changes: 18 additions & 0 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand Down Expand Up @@ -216,6 +217,23 @@ func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}

// skip watching podgroup if podgroup is not installed
_, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"},
v1beta1.SchemeGroupVersion.Version)
if err == nil {
// inject watching for job related podgroup
if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.MXJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
}); err != nil {
return err
}
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/controller.v1/mxnet/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -55,6 +56,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

err = v1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = kubeflowv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

Expand Down
17 changes: 17 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand Down Expand Up @@ -214,6 +215,22 @@ func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
}); err != nil {
return err
}
// skip watching podgroup if podgroup is not installed
_, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"},
v1beta1.SchemeGroupVersion.Version)
if err == nil {
// inject watching for job related podgroup
if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.PyTorchJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
}); err != nil {
return err
}
}

return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -65,6 +66,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

err = v1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = kubeflowv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

Expand Down
3 changes: 3 additions & 0 deletions pkg/controller.v1/tensorflow/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -71,6 +72,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

err = v1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = kubeflowv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

Expand Down
17 changes: 17 additions & 0 deletions pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand Down Expand Up @@ -235,6 +236,22 @@ func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
}); err != nil {
return err
}
// skip watching podgroup if podgroup is not installed
_, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"},
v1beta1.SchemeGroupVersion.Version)
if err == nil {
// inject watching for job related podgroup
if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.TFJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
}); err != nil {
return err
}
}

return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller.v1/xgboost/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -55,6 +56,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

err = v1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = kubeflowv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

Expand Down
17 changes: 17 additions & 0 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand Down Expand Up @@ -221,6 +222,22 @@ func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
}); err != nil {
return err
}
// skip watching podgroup if podgroup is not installed
_, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"},
v1beta1.SchemeGroupVersion.Version)
if err == nil {
// inject watching for job related podgroup
if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.XGBoostJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
}); err != nil {
return err
}
}

return nil
}
Expand Down

0 comments on commit ab9f3ec

Please sign in to comment.