Skip to content

Commit

Permalink
Respect SchedulingPolicy
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Feb 23, 2023
1 parent c21942d commit 68b2861
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 56 deletions.
4 changes: 2 additions & 2 deletions cmd/mpi-operator/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

kubeflowScheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
corev1 "k8s.io/api/core/v1"
Expand All @@ -46,6 +45,7 @@ import (

"github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options"
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
kubeflowscheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
controllersv1 "github.com/kubeflow/mpi-operator/pkg/controller"
"github.com/kubeflow/mpi-operator/pkg/version"
Expand Down Expand Up @@ -126,7 +126,7 @@ func Run(opt *options.ServerOption) error {

// Add mpi-job-controller types to the default Kubernetes Scheme so Events
// can be logged for mpi-job-controller types.
err = kubeflowScheme.AddToScheme(clientgokubescheme.Scheme)
err = kubeflowscheme.AddToScheme(clientgokubescheme.Scheme)
if err != nil {
return fmt.Errorf("CoreV1 Add Scheme failed: %v", err)
}
Expand Down
22 changes: 20 additions & 2 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7869,6 +7869,12 @@ spec:
e.g. gang-scheduling
properties:
minAvailable:
description: "MinAvailable defines the minimal number of member
to run the PodGroup. If the gang-scheduling is set to the
volcano, input is passed to `.spec.mimMember` in PodGroup
for the volcano. When using this field, you need to make
sure the application supports resizing (e.g., Elastic Horovod).
\n If not set, it defaults to the number of workers."
format: int32
type: integer
minResources:
Expand All @@ -7878,14 +7884,26 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
description: MinResources defines the minimal resources of
members to run the PodGroup. If the gang-scheduling is set
to the volcano, input is passed to `.spec.mimResources`
in PodGroup for volcano.
type: object
priorityClass:
description: PriorityClass defines the PodGroup's PriorityClass.
If the gang-scheduling is set to the volcano, input is passed
to `.spec.priorityClassName` in PodGroup for volcano.
type: string
queue:
description: Queue defines the queue name to allocate resource
for PodGroup. If the gang-scheduling is set to the volcano,
input is passed to `.spec.queue` in PodGroup for the volcano.
type: string
scheduleTimeoutSeconds:
description: 'SchedulerTimeoutSeconds defines the maximal
time of members to wait before run the PodGroup. Currently,
this parameter isn''t respected in any case. TODO (tenzen-y):
Modify comments when supporting scheduler-plugins.'
format: int32
type: integer
type: object
Expand Down
22 changes: 20 additions & 2 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7846,6 +7846,12 @@ spec:
e.g. gang-scheduling
properties:
minAvailable:
description: "MinAvailable defines the minimal number of member
to run the PodGroup. If the gang-scheduling is set to the
volcano, input is passed to `.spec.mimMember` in PodGroup
for the volcano. When using this field, you need to make
sure the application supports resizing (e.g., Elastic Horovod).
\n If not set, it defaults to the number of workers."
format: int32
type: integer
minResources:
Expand All @@ -7855,14 +7861,26 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
description: MinResources defines the minimal resources of
members to run the PodGroup. If the gang-scheduling is set
to the volcano, input is passed to `.spec.mimResources`
in PodGroup for volcano.
type: object
priorityClass:
description: PriorityClass defines the PodGroup's PriorityClass.
If the gang-scheduling is set to the volcano, input is passed
to `.spec.priorityClassName` in PodGroup for volcano.
type: string
queue:
description: Queue defines the queue name to allocate resource
for PodGroup. If the gang-scheduling is set to the volcano,
input is passed to `.spec.queue` in PodGroup for the volcano.
type: string
scheduleTimeoutSeconds:
description: 'SchedulerTimeoutSeconds defines the maximal
time of members to wait before run the PodGroup. Currently,
this parameter isn''t respected in any case. TODO (tenzen-y):
Modify comments when supporting scheduler-plugins.'
format: int32
type: integer
type: object
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -415,23 +415,28 @@
"type": "object",
"properties": {
"minAvailable": {
"description": "MinAvailable defines the minimal number of member to run the PodGroup. If the gang-scheduling is set to the volcano, input is passed to `.spec.mimMember` in PodGroup for the volcano. When using this field, you need to make sure the application supports resizing (e.g., Elastic Horovod).\n\nIf not set, it defaults to the number of workers.",
"type": "integer",
"format": "int32"
},
"minResources": {
"description": "MinResources defines the minimal resources of members to run the PodGroup. If the gang-scheduling is set to the volcano, input is passed to `.spec.mimResources` in PodGroup for volcano.",
"type": "object",
"additionalProperties": {
"default": {},
"$ref": "#/definitions/resource.Quantity"
}
},
"priorityClass": {
"description": "PriorityClass defines the PodGroup's PriorityClass. If the gang-scheduling is set to the volcano, input is passed to `.spec.priorityClassName` in PodGroup for volcano.",
"type": "string"
},
"queue": {
"description": "Queue defines the queue name to allocate resource for PodGroup. If the gang-scheduling is set to the volcano, input is passed to `.spec.queue` in PodGroup for the volcano.",
"type": "string"
},
"scheduleTimeoutSeconds": {
"description": "SchedulerTimeoutSeconds defines the maximal time of members to wait before run the PodGroup. Currently, this parameter isn't respected in any case.",
"type": "integer",
"format": "int32"
}
Expand Down
37 changes: 32 additions & 5 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,38 @@ const (
// SchedulingPolicy encapsulates various scheduling policies of the distributed training
// job, for example `minAvailable` for gang-scheduling.
type SchedulingPolicy struct {
MinAvailable *int32 `json:"minAvailable,omitempty"`
Queue string `json:"queue,omitempty"`
MinResources *v1.ResourceList `json:"minResources,omitempty"`
PriorityClass string `json:"priorityClass,omitempty"`
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
// MinAvailable defines the minimal number of member to run the PodGroup.
// If the gang-scheduling is set to the volcano,
// input is passed to `.spec.mimMember` in PodGroup for the volcano.
// When using this field, you need to make sure the application supports resizing (e.g., Elastic Horovod).
//
// If not set, it defaults to the number of workers.
// +optional
MinAvailable *int32 `json:"minAvailable,omitempty"`

// Queue defines the queue name to allocate resource for PodGroup.
// If the gang-scheduling is set to the volcano,
// input is passed to `.spec.queue` in PodGroup for the volcano.
// +optional
Queue string `json:"queue,omitempty"`

// MinResources defines the minimal resources of members to run the PodGroup.
// If the gang-scheduling is set to the volcano,
// input is passed to `.spec.mimResources` in PodGroup for volcano.
// +optional
MinResources *v1.ResourceList `json:"minResources,omitempty"`

// PriorityClass defines the PodGroup's PriorityClass.
// If the gang-scheduling is set to the volcano,
// input is passed to `.spec.priorityClassName` in PodGroup for volcano.
// +optional
PriorityClass string `json:"priorityClass,omitempty"`

// SchedulerTimeoutSeconds defines the maximal time of members to wait before run the PodGroup.
// Currently, this parameter isn't respected in any case.
// TODO (tenzen-y): Modify comments when supporting scheduler-plugins.
// +optional
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
}

// RunPolicy encapsulates various runtime policies of the distributed training
Expand Down
57 changes: 18 additions & 39 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func NewMPIJobController(
gangSchedulerName string) *MPIJobController {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet,
configMapInformer, secretInformer, serviceInformer, jobInformer,
podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName,
&clock.RealClock{})
podInformer, podgroupsInformer, mpiJobInformer,
gangSchedulerName, &clock.RealClock{})
}

// NewMPIJobController returns a new MPIJob controller.
Expand Down Expand Up @@ -387,13 +387,19 @@ func (c *MPIJobController) Run(threadiness int, stopCh <-chan struct{}) error {

// Wait for the caches to be synced before starting workers.
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.configMapSynced, c.secretSynced, c.serviceSynced, c.jobSynced, c.podSynced, c.mpiJobSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
synced := []cache.InformerSynced{
c.configMapSynced,
c.secretSynced,
c.serviceSynced,
c.jobSynced,
c.podSynced,
c.mpiJobSynced,
}
if c.gangSchedulerName != "" {
if ok := cache.WaitForCacheSync(stopCh, c.podgroupsSynced); !ok {
return fmt.Errorf("failed to wait for podgroup caches to sync")
}
synced = append(synced, c.podgroupsSynced)
}
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.Info("Starting workers")
Expand Down Expand Up @@ -570,7 +576,7 @@ func (c *MPIJobController) syncHandler(key string) error {
if !isMPIJobSuspended(mpiJob) {
// Get the PodGroup for this MPIJob
if c.gangSchedulerName != "" {
if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil {
if podgroup, err := c.getOrCreatePodGroups(mpiJob); podgroup == nil || err != nil {
return err
}
}
Expand Down Expand Up @@ -664,11 +670,11 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job
}

// getOrCreatePodGroups will create a PodGroup for gang scheduling by volcano.
func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob, minAvailableWorkerReplicas int32) (*podgroupv1beta1.PodGroup, error) {
func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (*podgroupv1beta1.PodGroup, error) {
podgroup, err := c.podgroupsLister.PodGroups(mpiJob.Namespace).Get(mpiJob.Name)
// If the PodGroup doesn't exist, we'll create it.
if errors.IsNotFound(err) {
podgroup, err = c.volcanoClient.SchedulingV1beta1().PodGroups(mpiJob.Namespace).Create(context.TODO(), newPodGroup(mpiJob, minAvailableWorkerReplicas), metav1.CreateOptions{})
podgroup, err = c.volcanoClient.SchedulingV1beta1().PodGroups(mpiJob.Namespace).Create(context.TODO(), newPodGroup(mpiJob), metav1.CreateOptions{})
}
// If an error occurs during Get/Create, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
Expand Down Expand Up @@ -844,7 +850,7 @@ func keysFromData(data map[string][]byte) []string {
return keys
}

// getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this
// getOrCreateWorkerStatefulSet gets the worker Pod controlled by this
// MPIJob, or creates one if it doesn't exist.
func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1.Pod, error) {
var workerPods []*corev1.Pod
Expand Down Expand Up @@ -1296,38 +1302,11 @@ func newSSHAuthSecret(job *kubeflow.MPIJob) (*corev1.Secret, error) {
}, nil
}

// newPodGroup creates a new PodGroup for an MPIJob
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newPodGroup(mpiJob *kubeflow.MPIJob, minAvailableReplicas int32) *podgroupv1beta1.PodGroup {
var pName string
if l := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]; l != nil {
pName = l.Template.Spec.PriorityClassName
if w := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]; pName == "" && w != nil {
pName = w.Template.Spec.PriorityClassName
}
}
return &podgroupv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: mpiJob.Name,
Namespace: mpiJob.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind),
},
},
Spec: podgroupv1beta1.PodGroupSpec{
MinMember: minAvailableReplicas,
Queue: mpiJob.Annotations[podgroupv1beta1.QueueNameAnnotationKey],
PriorityClassName: pName,
},
}
}

func workerName(mpiJob *kubeflow.MPIJob, index int) string {
return fmt.Sprintf("%s%s-%d", mpiJob.Name, workerSuffix, index)
}

// newWorker creates a new worker StatefulSet for an MPIJob resource. It also
// newWorker creates a new worker Pod for an MPIJob resource. It also
// sets the appropriate OwnerReferences on the resource so handleObject can
// discover the MPIJob resource that 'owns' it.
func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1.Pod {
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/utils/clock"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"

podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanofake "volcano.sh/apis/pkg/client/clientset/versioned/fake"
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
Expand Down
86 changes: 86 additions & 0 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 The Kubeflow Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
common "github.com/kubeflow/common/pkg/apis/common/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
)

// newPodGroup will create a new PodGroup for an MPIJob
// resource. If the parameters set in the schedulingPolicy aren't empty, it will pass them to a new PodGroup;
// if they are empty, it will set the default values in the following:
//
// minMember: NUM(workers) + 1
// queue: A "scheduling.volcano.sh/queue-name" annotation value.
// priorityClass: A value returned from the calcPriorityClassName function.
// minResources: nil
//
// It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newPodGroup(mpiJob *kubeflow.MPIJob) *volcanov1beta1.PodGroup {
minMember := workerReplicas(mpiJob) + 1
queueName := mpiJob.Annotations[volcanov1beta1.QueueNameAnnotationKey]
var minResources *corev1.ResourceList
if schedulingPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedulingPolicy != nil {
if schedulingPolicy.MinAvailable != nil {
minMember = *schedulingPolicy.MinAvailable
}
if len(schedulingPolicy.Queue) != 0 {
queueName = schedulingPolicy.Queue
}
if schedulingPolicy.MinResources != nil {
minResources = schedulingPolicy.MinResources
}
}
return &volcanov1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: mpiJob.Name,
Namespace: mpiJob.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind),
},
},
Spec: volcanov1beta1.PodGroupSpec{
MinMember: minMember,
Queue: queueName,
PriorityClassName: calcPriorityClassName(mpiJob.Spec.MPIReplicaSpecs, mpiJob.Spec.RunPolicy.SchedulingPolicy),
MinResources: minResources,
},
}
}

// calcPriorityClassName calculates the priorityClass name needed for podGroup according to the following priorities:
// 1. .spec.runPolicy.schedulingPolicy.priorityClass
// 2. .spec.mpiReplicaSecs[Launcher].template.spec.priorityClassName
// 3. .spec.mpiReplicaSecs[Worker].template.spec.priorityClassName
func calcPriorityClassName(
replicas map[kubeflow.MPIReplicaType]*common.ReplicaSpec,
schedulingPolicy *kubeflow.SchedulingPolicy,
) string {
if schedulingPolicy != nil && len(schedulingPolicy.PriorityClass) != 0 {
return schedulingPolicy.PriorityClass
} else if l := replicas[kubeflow.MPIReplicaTypeLauncher]; l != nil && len(l.Template.Spec.PriorityClassName) != 0 {
return l.Template.Spec.PriorityClassName
} else if w := replicas[kubeflow.MPIReplicaTypeWorker]; w != nil && len(w.Template.Spec.PriorityClassName) != 0 {
return w.Template.Spec.PriorityClassName
} else {
return ""
}
}
Loading

0 comments on commit 68b2861

Please sign in to comment.