Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support processing resource types other than GPU #75

Merged
merged 7 commits into from
Jan 15, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions cmd/mpi-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
)

var (
masterURL string
kubeConfig string
gpusPerNode int
kubectlDeliveryImage string
namespace string
masterURL string
kubeConfig string
gpusPerNode int
processingUnitsPerNode int
processingResourceType string
kubectlDeliveryImage string
namespace string
)

func main() {
Expand Down Expand Up @@ -78,6 +80,8 @@ func main() {
kubeInformerFactory.Batch().V1().Jobs(),
kubeflowInformerFactory.Kubeflow().V1alpha1().MPIJobs(),
gpusPerNode,
processingUnitsPerNode,
processingResourceType,
kubectlDeliveryImage)

go kubeInformerFactory.Start(stopCh)
Expand All @@ -98,4 +102,10 @@ func init() {
"The maximum number of GPUs available per node. Note that this will be ignored if the GPU resources are explicitly specified in the MPIJob pod spec.")
flag.StringVar(&kubectlDeliveryImage, "kubectl-delivery-image", "", "The container image used to deliver the kubectl binary.")
flag.StringVar(&namespace, "namespace", "", "The namespace used to obtain the listers.")
flag.IntVar(
&processingUnitsPerNode,
"processing-units-per-node",
1,
"The maximum number of processing units available per node. Note that this will be ignored if the processing resources are explicitly specified in the MPIJob pod spec.")
flag.StringVar(&processingResourceType, "processing-resource-type", "nvidia.com/gpu", "The compute resource name, e.g. 'nvidia.com/gpu' or 'cpu'.")
}
15 changes: 13 additions & 2 deletions pkg/apis/kubeflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,27 @@ type MPIJobList struct {
type MPIJobSpec struct {
// Specifies the desired number of GPUs the MPIJob should run on.
// Mutually exclusive with the `Replicas` field.
// Note that this is deprecated in favor of `ProcessingUnits` field.
// +optional
GPUs *int32 `json:"gpus,omitempty"`

// Specifies the desired number of processing units the MPIJob should run on.
// Mutually exclusive with the `Replicas` field.
// +optional
ProcessingUnits *int32 `json:"processingUnits,omitempty"`

// Specifies the number of slots per worker used in hostfile.
// Defaults to the number of processing units per worker.
// +optional
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// Run the launcher on the master.
// Optional: Default to false
// Defaults to false.
// +optional
LauncherOnMaster bool `json:"launcherOnMaster,omitempty"`

// Specifies the number of retries before marking this job failed.
// Defaults to 6
// Defaults to 6.
// +optional
BackoffLimit *int32 `json:"backoffLimit,omitempty"`

Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 98 additions & 50 deletions pkg/controllers/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
launcherSuffix = "-launcher"
workerSuffix = "-worker"
gpuResourceName = "nvidia.com/gpu"
cpuResourceName = "cpu"
labelGroupName = "group_name"
labelMPIJobName = "mpi_job_name"
labelMPIRoleType = "mpi_role_type"
Expand Down Expand Up @@ -125,6 +126,10 @@ type MPIJobController struct {
recorder record.EventRecorder
// The maximum number of GPUs per node.
gpusPerNode int
// The maximum number of processing units per node.
processingUnitsPerNode int
// The processing resource name, e.g. "nvidia.com/gpu" or "cpu"
processingResourceType string
// The container image used to deliver the kubectl binary.
kubectlDeliveryImage string
}
Expand All @@ -141,6 +146,8 @@ func NewMPIJobController(
jobInformer batchinformers.JobInformer,
mpiJobInformer informers.MPIJobInformer,
gpusPerNode int,
processingUnitsPerNode int,
processingResourceType string,
kubectlDeliveryImage string) *MPIJobController {

// Create event broadcaster.
Expand All @@ -154,26 +161,28 @@ func NewMPIJobController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &MPIJobController{
kubeClient: kubeClient,
kubeflowClient: kubeflowClient,
configMapLister: configMapInformer.Lister(),
configMapSynced: configMapInformer.Informer().HasSynced,
serviceAccountLister: serviceAccountInformer.Lister(),
serviceAccountSynced: serviceAccountInformer.Informer().HasSynced,
roleLister: roleInformer.Lister(),
roleSynced: roleInformer.Informer().HasSynced,
roleBindingLister: roleBindingInformer.Lister(),
roleBindingSynced: roleBindingInformer.Informer().HasSynced,
statefulSetLister: statefulSetInformer.Lister(),
statefulSetSynced: statefulSetInformer.Informer().HasSynced,
jobLister: jobInformer.Lister(),
jobSynced: jobInformer.Informer().HasSynced,
mpiJobLister: mpiJobInformer.Lister(),
mpiJobSynced: mpiJobInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"),
recorder: recorder,
gpusPerNode: gpusPerNode,
kubectlDeliveryImage: kubectlDeliveryImage,
kubeClient: kubeClient,
kubeflowClient: kubeflowClient,
configMapLister: configMapInformer.Lister(),
configMapSynced: configMapInformer.Informer().HasSynced,
serviceAccountLister: serviceAccountInformer.Lister(),
serviceAccountSynced: serviceAccountInformer.Informer().HasSynced,
roleLister: roleInformer.Lister(),
roleSynced: roleInformer.Informer().HasSynced,
roleBindingLister: roleBindingInformer.Lister(),
roleBindingSynced: roleBindingInformer.Informer().HasSynced,
statefulSetLister: statefulSetInformer.Lister(),
statefulSetSynced: statefulSetInformer.Informer().HasSynced,
jobLister: jobInformer.Lister(),
jobSynced: jobInformer.Informer().HasSynced,
mpiJobLister: mpiJobInformer.Lister(),
mpiJobSynced: mpiJobInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"),
recorder: recorder,
gpusPerNode: gpusPerNode,
processingUnitsPerNode: processingUnitsPerNode,
processingResourceType: processingResourceType,
kubectlDeliveryImage: kubectlDeliveryImage,
}

glog.Info("Setting up event handlers")
Expand Down Expand Up @@ -406,15 +415,15 @@ func (c *MPIJobController) syncHandler(key string) error {
// We're done if the launcher either succeeded or failed.
done := launcher != nil && (launcher.Status.Succeeded == 1 || launcher.Status.Failed == 1)

workerReplicas, gpusPerWorker, err := allocateGPUs(mpiJob, c.gpusPerNode, done)
workerReplicas, processingUnitsPerWorker, err := allocateProcessingUnits(mpiJob, c.gpusPerNode, c.processingUnitsPerNode, c.processingResourceType, done)
if err != nil {
runtime.HandleError(err)
return nil
}

if !done {
// Get the ConfigMap for this MPIJob.
if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, gpusPerWorker); config == nil || err != nil {
if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, processingUnitsPerWorker); config == nil || err != nil {
return err
}

Expand All @@ -434,7 +443,7 @@ func (c *MPIJobController) syncHandler(key string) error {
}
}

worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, gpusPerWorker)
worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, processingUnitsPerWorker, c.processingResourceType)
if err != nil {
return err
}
Expand Down Expand Up @@ -483,45 +492,67 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job
return launcher, nil
}

// allocateGPUs allocates the worker replicas and GPUs per worker.
func allocateGPUs(mpiJob *kubeflow.MPIJob, gpusPerNode int, done bool) (workerReplicas int, gpusPerWorker int, err error) {
// allocateProcessingUnits allocates the worker replicas and processing units per worker.
func allocateProcessingUnits(
mpiJob *kubeflow.MPIJob,
gpusPerNode int,
processingUnitsPerNode int,
processingResourceType string,
done bool) (workerReplicas int, processingUnitsPerWorker int, err error) {
workerReplicas = 0
gpusPerWorker = 0
processingUnitsPerWorker = 0
err = nil
if mpiJob.Spec.GPUs != nil {
totalGPUs := int(*mpiJob.Spec.GPUs)
if totalGPUs < gpusPerNode {
workerReplicas = 1
gpusPerWorker = totalGPUs
} else if totalGPUs%gpusPerNode == 0 {
workerReplicas = totalGPUs / gpusPerNode
gpusPerWorker = gpusPerNode
if mpiJob.Spec.GPUs != nil || mpiJob.Spec.ProcessingUnits != nil {
if mpiJob.Spec.ProcessingUnits != nil && mpiJob.Spec.GPUs != nil {
err = fmt.Errorf("Cannot specify both GPUs and ProcessingUnits at the same time")
} else {
err = fmt.Errorf("specified #GPUs is not a multiple of GPUs per node (%d)", gpusPerNode)
totalProcessingUnits := 0
usedSpecField := ""
pusPerNode := 0
if mpiJob.Spec.GPUs != nil {
fmt.Println("GPUs field is deprecated. Please switch to use ProcessingUnits.")
totalProcessingUnits = int(*mpiJob.Spec.GPUs)
pusPerNode = gpusPerNode
usedSpecField = "GPUs"
} else if mpiJob.Spec.ProcessingUnits != nil {
totalProcessingUnits = int(*mpiJob.Spec.ProcessingUnits)
pusPerNode = processingUnitsPerNode
usedSpecField = "ProcessingUnits"
}
if totalProcessingUnits < pusPerNode {
workerReplicas = 1
processingUnitsPerWorker = totalProcessingUnits
} else if totalProcessingUnits%pusPerNode == 0 {
workerReplicas = totalProcessingUnits / pusPerNode
processingUnitsPerWorker = pusPerNode
} else {
err = fmt.Errorf(
"specified #%s is not a multiple of %s per node (%d)", usedSpecField, usedSpecField, processingUnitsPerNode)
}
}
} else if mpiJob.Spec.Replicas != nil {
workerReplicas = int(*mpiJob.Spec.Replicas)
container := mpiJob.Spec.Template.Spec.Containers[0]
if container.Resources.Limits != nil {
if val, ok := container.Resources.Limits[gpuResourceName]; ok {
gpus, _ := val.AsInt64()
gpusPerWorker = int(gpus)
if val, ok := container.Resources.Limits[convertProcessingResourceType(processingResourceType)]; ok {
processingUnits, _ := val.AsInt64()
processingUnitsPerWorker = int(processingUnits)
}
}
}
if done {
workerReplicas = 0
}
return workerReplicas, gpusPerWorker, err
return workerReplicas, processingUnitsPerWorker, err
}

// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
// one if it doesn't exist.
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) (*corev1.ConfigMap, error) {
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) (*corev1.ConfigMap, error) {
cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix)
// If the ConfigMap doesn't exist, we'll create it.
if errors.IsNotFound(err) {
cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas, gpusPerWorker))
cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas, processingUnitsPerWorker))
}
// If an error occurs during Get/Create, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
Expand Down Expand Up @@ -616,11 +647,11 @@ func (c *MPIJobController) getLauncherRoleBinding(mpiJob *kubeflow.MPIJob) (*rba

// getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this
// MPIJob, or creates one if it doesn't exist.
func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) (*appsv1.StatefulSet, error) {
func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int, processingResourceType string) (*appsv1.StatefulSet, error) {
worker, err := c.statefulSetLister.StatefulSets(mpiJob.Namespace).Get(mpiJob.Name + workerSuffix)
// If the StatefulSet doesn't exist, we'll create it.
if errors.IsNotFound(err) && workerReplicas > 0 {
worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas), gpusPerWorker))
worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas), processingUnitsPerWorker, processingResourceType))
}
// If an error occurs during Get/Create, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
Expand All @@ -639,7 +670,7 @@ func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob,

// If the worker is out of date, update the worker.
if worker != nil && int(*worker.Spec.Replicas) != workerReplicas {
worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas), gpusPerWorker))
worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas), processingUnitsPerWorker, processingResourceType))
// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
Expand Down Expand Up @@ -732,18 +763,22 @@ func (c *MPIJobController) handleObject(obj interface{}) {
// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) *corev1.ConfigMap {
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) *corev1.ConfigMap {
kubexec := fmt.Sprintf(`#!/bin/sh
set -x
POD_NAME=$1
shift
%s/kubectl exec ${POD_NAME} -- /bin/sh -c "$*"
`, kubectlMountPath)

// If no GPU is specified, default to 1 slot.
// If no processing unit is specified, default to 1 slot.
slots := 1
if gpusPerWorker > 0 {
slots = gpusPerWorker
if mpiJob.Spec.SlotsPerWorker == nil {
if processingUnitsPerWorker > 0 {
slots = processingUnitsPerWorker
}
} else {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
var buffer bytes.Buffer
for i := 0; i < workerReplicas; i++ {
Expand Down Expand Up @@ -850,10 +885,23 @@ func newLauncherRoleBinding(mpiJob *kubeflow.MPIJob) *rbacv1.RoleBinding {
}
}

func convertProcessingResourceType(processingResourceType string) corev1.ResourceName {
if processingResourceType == gpuResourceName {
return gpuResourceName
} else if processingResourceType == cpuResourceName {
return cpuResourceName
}
fmt.Printf(
"Unsupported processing resource type specified: %q. \nSwitching to use NVIDIA GPU by default.",
processingResourceType)

return gpuResourceName
}

// newWorker creates a new worker StatefulSet for an MPIJob resource. It also
// sets the appropriate OwnerReferences on the resource so handleObject can
// discover the MPIJob resource that 'owns' it.
func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, gpus int) *appsv1.StatefulSet {
func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, processingUnits int, processingResourceType string) *appsv1.StatefulSet {
labels := map[string]string{
labelGroupName: "kubeflow.org",
labelMPIJobName: mpiJob.Name,
Expand All @@ -878,7 +926,7 @@ func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, gpus int) *appsv1
if container.Resources.Limits == nil {
container.Resources.Limits = make(corev1.ResourceList)
}
container.Resources.Limits[gpuResourceName] = *resource.NewQuantity(int64(gpus), resource.DecimalExponent)
container.Resources.Limits[convertProcessingResourceType(processingResourceType)] = *resource.NewQuantity(int64(processingUnits), resource.DecimalExponent)

// We need the kubexec.sh script here because Open MPI checks for the path
// in every rank.
Expand Down
Loading