Skip to content

Commit

Permalink
Use podGroup instead of PDB in v1beta2
Browse files Browse the repository at this point in the history
  • Loading branch information
thandayuthapani committed Mar 18, 2019
1 parent 0e3756a commit 8e9e4c9
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 32 deletions.
15 changes: 15 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ required = [
name = "github.com/stretchr/testify"
version = "1.2.2"

[[constraint]]
name = "github.com/kubernetes-sigs/kube-batch"
version = "v0.3"

[[constraint]]
name = "github.com/sirupsen/logrus"
version = "v1.0.4"
Expand Down
22 changes: 14 additions & 8 deletions cmd/tf-operator.v1beta1/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
controller "github.com/kubeflow/tf-operator/pkg/controller.v1beta1/tensorflow"
"github.com/kubeflow/tf-operator/pkg/util/signals"
"github.com/kubeflow/tf-operator/pkg/version"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -91,7 +92,7 @@ func Run(opt *options.ServerOption) error {
}

// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, err := createClientSets(kcfg)
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}
Expand All @@ -103,7 +104,7 @@ func Run(opt *options.ServerOption) error {
unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg, opt.Namespace)

// Create tf controller.
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)
Expand Down Expand Up @@ -161,32 +162,37 @@ func Run(opt *options.ServerOption) error {
return nil
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, error) {
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, kubebatchclient.Interface, error) {

crdClient, err := crdclient.NewForConfig(config)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

checkCRDExists(crdClient, v1beta1.TFCRD)

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "tf-operator"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

tfJobClientSet, err := tfjobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, nil
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
if err != nil {
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, nil
}

func checkCRDExists(clientset crdclient.Interface, crdName string) {
Expand Down
21 changes: 13 additions & 8 deletions cmd/tf-operator.v1beta2/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
controller "github.com/kubeflow/tf-operator/pkg/controller.v1beta2/tensorflow"
"github.com/kubeflow/tf-operator/pkg/util/signals"
"github.com/kubeflow/tf-operator/pkg/version"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -91,7 +92,7 @@ func Run(opt *options.ServerOption) error {
}

// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, err := createClientSets(kcfg)
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}
Expand All @@ -103,7 +104,7 @@ func Run(opt *options.ServerOption) error {
unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg, opt.Namespace)

// Create tf controller.
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)
Expand Down Expand Up @@ -161,32 +162,36 @@ func Run(opt *options.ServerOption) error {
return nil
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, error) {
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, kubebatchclient.Interface, error) {

crdClient, err := crdclient.NewForConfig(config)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

checkCRDExists(crdClient, v1beta2.TFCRD)

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "tf-operator"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

tfJobClientSet, err := tfjobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, nil
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
if err != nil {
return nil, nil, nil, nil, err
}
return kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, nil
}

func checkCRDExists(clientset crdclient.Interface, crdName string) {
Expand Down
39 changes: 39 additions & 0 deletions examples/crd/crd-podgroup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: podgroups.scheduling.incubator.k8s.io
spec:
group: scheduling.incubator.k8s.io
names:
kind: PodGroup
plural: podgroups
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
minMember:
format: int32
type: integer
type: object
status:
properties:
succeeded:
format: int32
type: integer
failed:
format: int32
type: integer
running:
format: int32
type: integer
type: object
type: object
version: v1alpha1
3 changes: 3 additions & 0 deletions examples/distribution_strategy/distributed_tfjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ spec:
replicas: 3
restartPolicy: Never
template:
metadata:
annotations:
scheduling.k8s.io/group-name: "distributed-training"
spec:
containers:
- name: tensorflow
Expand Down
67 changes: 59 additions & 8 deletions pkg/common/jobcontroller/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"strings"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
Expand Down Expand Up @@ -88,6 +90,9 @@ type JobController struct {
// kubeClientSet is a standard kubernetes clientset.
KubeClientSet kubeclientset.Interface

//KubeBatchClientSet is a standard kube-batch clientset.
KubeBatchClientSet kubebatchclient.Interface

// podLister can list/get pods from the shared informer's store.
PodLister corelisters.PodLister

Expand Down Expand Up @@ -135,6 +140,7 @@ func NewJobController(
reconcilerSyncPeriod metav1.Duration,
enableGangScheduling bool,
kubeClientSet kubeclientset.Interface,
kubeBatchClientSet kubebatchclient.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
workQueueName string) JobController {

Expand All @@ -160,14 +166,15 @@ func NewJobController(
}

jc := JobController{
Controller: controllerImpl,
Config: jobControllerConfig,
PodControl: realPodControl,
ServiceControl: realServiceControl,
KubeClientSet: kubeClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
Controller: controllerImpl,
Config: jobControllerConfig,
PodControl: realPodControl,
ServiceControl: realServiceControl,
KubeClientSet: kubeClientSet,
KubeBatchClientSet: kubeBatchClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
}
return jc

Expand Down Expand Up @@ -197,6 +204,31 @@ func (jc *JobController) GenLabels(jobName string) map[string]string {
}
}

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1alpha1.PodGroup, error) {

kubeBatchClientInterface := jc.KubeBatchClientSet
// Check whether podGroup exists or not
podGroup, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err == nil {
return podGroup, nil
}

// create podGroup for gang scheduling by kube-batch
minAvailable := intstr.FromInt(int(minAvailableReplicas))
createPodGroup := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: job.GetName(),
OwnerReferences: []metav1.OwnerReference{
*jc.GenOwnerReference(job),
},
},
Spec: v1alpha1.PodGroupSpec{
MinMember: minAvailable.IntVal,
},
}
return kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Create(createPodGroup)
}

// SyncPdb will create a PDB for gang scheduling by kube-arbitrator.
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error) {
labelJobName := jc.Controller.GetJobNameLabelKey()
Expand Down Expand Up @@ -231,6 +263,25 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32)
return jc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Create(createPdb)
}

func (jc *JobController) DeletePodGroup(job metav1.Object) error {
kubeBatchClientInterface := jc.KubeBatchClientSet

//check whether podGroup exists or not
_, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err != nil && k8serrors.IsNotFound(err) {
return nil
}

log.Infof("Deleting PodGroup %s", job.GetName())

//delete podGroup
err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("unable to delete PodGroup: %v", err)
}
return nil
}

func (jc *JobController) DeletePdb(job metav1.Object) error {

// Check the pdb exist or not
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller.v1beta1/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewTFController(
// This variable is for unstructured informer.
tfJobInformer tfjobinformersv1beta1.TFJobInformer,
kubeClientSet kubeclientset.Interface,
kubeBatchClientSet kubebatchclient.Interface,
tfJobClientSet tfjobclientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
// This field is not used now but we keep it since it will be used
Expand All @@ -116,7 +118,7 @@ func NewTFController(
// Create base controller
log.Info("Creating Job controller")
jc := jobcontroller.NewJobController(tc, metav1.Duration{Duration: 15 * time.Second},
option.EnableGangScheduling, kubeClientSet, kubeInformerFactory, tfv1beta1.Plural)
option.EnableGangScheduling, kubeClientSet, kubeBatchClientSet, kubeInformerFactory, tfv1beta1.Plural)
tc.JobController = jc
// Set sync handler.
tc.syncHandler = tc.syncTFJob
Expand Down
Loading

0 comments on commit 8e9e4c9

Please sign in to comment.