-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect SchedulingPolicy #520
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -265,8 +265,8 @@ func NewMPIJobController( | |
gangSchedulerName string) *MPIJobController { | ||
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet, | ||
configMapInformer, secretInformer, serviceInformer, jobInformer, | ||
podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName, | ||
&clock.RealClock{}) | ||
podInformer, podgroupsInformer, mpiJobInformer, | ||
gangSchedulerName, &clock.RealClock{}) | ||
} | ||
|
||
// NewMPIJobController returns a new MPIJob controller. | ||
|
@@ -387,13 +387,19 @@ func (c *MPIJobController) Run(threadiness int, stopCh <-chan struct{}) error { | |
|
||
// Wait for the caches to be synced before starting workers. | ||
klog.Info("Waiting for informer caches to sync") | ||
if ok := cache.WaitForCacheSync(stopCh, c.configMapSynced, c.secretSynced, c.serviceSynced, c.jobSynced, c.podSynced, c.mpiJobSynced); !ok { | ||
return fmt.Errorf("failed to wait for caches to sync") | ||
synced := []cache.InformerSynced{ | ||
c.configMapSynced, | ||
c.secretSynced, | ||
c.serviceSynced, | ||
c.jobSynced, | ||
c.podSynced, | ||
c.mpiJobSynced, | ||
} | ||
if c.gangSchedulerName != "" { | ||
if ok := cache.WaitForCacheSync(stopCh, c.podgroupsSynced); !ok { | ||
return fmt.Errorf("failed to wait for podgroup caches to sync") | ||
} | ||
synced = append(synced, c.podgroupsSynced) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we also don't need the priorityClass data unless there is gangSchedulerName enabled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's right. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will change the above logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still not changed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok { | ||
return fmt.Errorf("failed to wait for caches to sync") | ||
} | ||
|
||
klog.Info("Starting workers") | ||
|
@@ -570,7 +576,7 @@ func (c *MPIJobController) syncHandler(key string) error { | |
if !isMPIJobSuspended(mpiJob) { | ||
// Get the PodGroup for this MPIJob | ||
if c.gangSchedulerName != "" { | ||
if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil { | ||
if podgroup, err := c.getOrCreatePodGroups(mpiJob); podgroup == nil || err != nil { | ||
return err | ||
} | ||
} | ||
|
@@ -664,11 +670,11 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job | |
} | ||
|
||
// getOrCreatePodGroups will create a PodGroup for gang scheduling by volcano. | ||
func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob, minAvailableWorkerReplicas int32) (*podgroupv1beta1.PodGroup, error) { | ||
func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (*podgroupv1beta1.PodGroup, error) { | ||
podgroup, err := c.podgroupsLister.PodGroups(mpiJob.Namespace).Get(mpiJob.Name) | ||
// If the PodGroup doesn't exist, we'll create it. | ||
if errors.IsNotFound(err) { | ||
podgroup, err = c.volcanoClient.SchedulingV1beta1().PodGroups(mpiJob.Namespace).Create(context.TODO(), newPodGroup(mpiJob, minAvailableWorkerReplicas), metav1.CreateOptions{}) | ||
podgroup, err = c.volcanoClient.SchedulingV1beta1().PodGroups(mpiJob.Namespace).Create(context.TODO(), newPodGroup(mpiJob), metav1.CreateOptions{}) | ||
} | ||
// If an error occurs during Get/Create, we'll requeue the item so we | ||
// can attempt processing again later. This could have been caused by a | ||
|
@@ -844,7 +850,7 @@ func keysFromData(data map[string][]byte) []string { | |
return keys | ||
} | ||
|
||
// getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this | ||
// getOrCreateWorkerStatefulSet gets the worker Pod controlled by this | ||
// MPIJob, or creates one if it doesn't exist. | ||
func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1.Pod, error) { | ||
var workerPods []*corev1.Pod | ||
|
@@ -1296,38 +1302,11 @@ func newSSHAuthSecret(job *kubeflow.MPIJob) (*corev1.Secret, error) { | |
}, nil | ||
} | ||
|
||
// newPodGroup creates a new PodGroup for an MPIJob | ||
// resource. It also sets the appropriate OwnerReferences on the resource so | ||
// handleObject can discover the MPIJob resource that 'owns' it. | ||
func newPodGroup(mpiJob *kubeflow.MPIJob, minAvailableReplicas int32) *podgroupv1beta1.PodGroup { | ||
var pName string | ||
if l := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]; l != nil { | ||
pName = l.Template.Spec.PriorityClassName | ||
if w := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]; pName == "" && w != nil { | ||
pName = w.Template.Spec.PriorityClassName | ||
} | ||
} | ||
return &podgroupv1beta1.PodGroup{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: mpiJob.Name, | ||
Namespace: mpiJob.Namespace, | ||
OwnerReferences: []metav1.OwnerReference{ | ||
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), | ||
}, | ||
}, | ||
Spec: podgroupv1beta1.PodGroupSpec{ | ||
MinMember: minAvailableReplicas, | ||
Queue: mpiJob.Annotations[podgroupv1beta1.QueueNameAnnotationKey], | ||
PriorityClassName: pName, | ||
}, | ||
} | ||
} | ||
|
||
func workerName(mpiJob *kubeflow.MPIJob, index int) string { | ||
return fmt.Sprintf("%s%s-%d", mpiJob.Name, workerSuffix, index) | ||
} | ||
|
||
// newWorker creates a new worker StatefulSet for an MPIJob resource. It also | ||
// newWorker creates a new worker Pod for an MPIJob resource. It also | ||
// sets the appropriate OwnerReferences on the resource so handleObject can | ||
// discover the MPIJob resource that 'owns' it. | ||
func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1.Pod { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
// Copyright 2023 The Kubeflow Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package controller | ||
|
||
import ( | ||
common "github.com/kubeflow/common/pkg/apis/common/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" | ||
|
||
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" | ||
) | ||
|
||
// newPodGroup will create a new PodGroup for an MPIJob | ||
// resource. If the parameters set in the schedulingPolicy aren't empty, it will pass them to a new PodGroup; | ||
// if they are empty, it will set the default values in the following: | ||
// | ||
// minMember: NUM(workers) + 1 | ||
// queue: A "scheduling.volcano.sh/queue-name" annotation value. | ||
// priorityClass: A value returned from the calcPriorityClassName function. | ||
// minResources: nil | ||
// | ||
// It also sets the appropriate OwnerReferences on the resource so | ||
// handleObject can discover the MPIJob resource that 'owns' it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. explain how minResources is calculated. What does volcano use that field? If it's all added up, how does it know whether they are consumed by one or multiple pods? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sure.
I will answer in another thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add in the comment the fields that this function populates and how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
func newPodGroup(mpiJob *kubeflow.MPIJob) *volcanov1beta1.PodGroup { | ||
minMember := workerReplicas(mpiJob) + 1 | ||
queueName := mpiJob.Annotations[volcanov1beta1.QueueNameAnnotationKey] | ||
var minResources *corev1.ResourceList | ||
if schedulingPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedulingPolicy != nil { | ||
if schedulingPolicy.MinAvailable != nil { | ||
minMember = *schedulingPolicy.MinAvailable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if volcano schedules a number of pods that is less than We need a way to restrict the usage of this field or have clear documentation that this is not supported for all kinds of MPI applications. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In that case, volcano schedules all Pods. If the cluster does not have enough resources for all Pods, some pods will be marked as
I agree. I think we have 2 options. Since we don't have fields if the MPIJob uses elastic horovod.
I like the second option. What do you think? @alculquicondor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there similar fields in the training operator? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, most customjob-controllers have no fields on whether CustomJob uses elastic mode. This means users can set any value to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in that case, I think we should proceed with option 1 for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That said, there is currently no documentation about SchedulingPolicy overall :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree. Maybe, we can add documentation to https://www.kubeflow.org/docs/components/training/job-scheduling/.
You're right... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another thought: even if we add an "elastic" field, it doesn't mean that the application will support it. So we need to clarify it in the documentation anyways. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree. Probably, we can discuss if we add an |
||
} | ||
if len(schedulingPolicy.Queue) != 0 { | ||
queueName = schedulingPolicy.Queue | ||
} | ||
if schedulingPolicy.MinResources != nil { | ||
minResources = schedulingPolicy.MinResources | ||
} | ||
} | ||
return &volcanov1beta1.PodGroup{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: mpiJob.Name, | ||
Namespace: mpiJob.Namespace, | ||
OwnerReferences: []metav1.OwnerReference{ | ||
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), | ||
}, | ||
}, | ||
Spec: volcanov1beta1.PodGroupSpec{ | ||
MinMember: minMember, | ||
Queue: queueName, | ||
PriorityClassName: calcPriorityClassName(mpiJob.Spec.MPIReplicaSpecs, mpiJob.Spec.RunPolicy.SchedulingPolicy), | ||
MinResources: minResources, | ||
}, | ||
} | ||
} | ||
|
||
// calcPriorityClassName calculates the priorityClass name needed for podGroup according to the following priorities: | ||
// 1. .spec.runPolicy.schedulingPolicy.priorityClass | ||
// 2. .spec.mpiReplicaSecs[Launcher].template.spec.priorityClassName | ||
// 3. .spec.mpiReplicaSecs[Worker].template.spec.priorityClassName | ||
func calcPriorityClassName( | ||
replicas map[kubeflow.MPIReplicaType]*common.ReplicaSpec, | ||
schedulingPolicy *kubeflow.SchedulingPolicy, | ||
) string { | ||
if schedulingPolicy != nil && len(schedulingPolicy.PriorityClass) != 0 { | ||
return schedulingPolicy.PriorityClass | ||
} else if l := replicas[kubeflow.MPIReplicaTypeLauncher]; l != nil && len(l.Template.Spec.PriorityClassName) != 0 { | ||
return l.Template.Spec.PriorityClassName | ||
} else if w := replicas[kubeflow.MPIReplicaTypeWorker]; w != nil && len(w.Template.Spec.PriorityClassName) != 0 { | ||
return w.Template.Spec.PriorityClassName | ||
} else { | ||
return "" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain that this will only work if the MPI application supports resizing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not set, it defaults to the number of workers