Skip to content

Commit

Permalink
Run workers first and wait for them (#484)
Browse files Browse the repository at this point in the history
* Real rebase of waitforworkes option

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix generated API

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix format

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Add docs

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix typo

* Add tests for waitforworkers

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Add missing err test

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix cleanpodpolicy

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Remove debug

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix tests

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Rework api

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix generated api

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* One more fix of api

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Swagger fix

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix readme

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix readme again

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Add comments

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Add kubebuilder annotations

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix manifests

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

---------

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>
  • Loading branch information
xhejtman committed Jun 26, 2023
1 parent 7b33b4e commit f8d815c
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 28 deletions.
6 changes: 6 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ spec:
type: object
spec:
properties:
launcherCreationPolicy:
default: AtStartup
description: launcherCreationPolicy if WaitForWorkersReady, the launcher
is created only after all workers are in Ready state. Defaults to
AtStartup.
type: string
mpiImplementation:
default: OpenMPI
description: MPIImplementation is the MPI implementation. Options
Expand Down
6 changes: 6 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ spec:
type: object
spec:
properties:
launcherCreationPolicy:
default: AtStartup
description: launcherCreationPolicy if WaitForWorkersReady, the launcher
is created only after all workers are in Ready state. Defaults to
AtStartup.
type: string
mpiImplementation:
default: OpenMPI
description: MPIImplementation is the MPI implementation. Options
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/kubeflow/v2beta1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func SetDefaults_MPIJob(mpiJob *MPIJob) {
if mpiJob.Spec.MPIImplementation == "" {
mpiJob.Spec.MPIImplementation = MPIImplementationOpenMPI
}
if mpiJob.Spec.LauncherCreationPolicy == "" {
mpiJob.Spec.LauncherCreationPolicy = LauncherCreationPolicyAtStartup
}

// set default to Launcher
setDefaultsTypeLauncher(mpiJob.Spec.MPIReplicaSpecs[MPIReplicaTypeLauncher])
Expand Down
35 changes: 21 additions & 14 deletions pkg/apis/kubeflow/v2beta1/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
RunPolicy: RunPolicy{
CleanPodPolicy: NewCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
LauncherCreationPolicy: "AtStartup",
},
},
},
Expand All @@ -48,8 +49,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
LauncherCreationPolicy: "AtStartup",
},
},
want: MPIJob{
Expand All @@ -61,8 +63,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
LauncherCreationPolicy: "AtStartup",
},
},
},
Expand All @@ -76,8 +79,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
LauncherCreationPolicy: "AtStartup",
},
},
want: MPIJob{
Expand All @@ -89,8 +93,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
LauncherCreationPolicy: "AtStartup",
},
},
},
Expand All @@ -108,8 +113,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
RunPolicy: RunPolicy{
CleanPodPolicy: NewCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
LauncherCreationPolicy: "AtStartup",
MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{
MPIReplicaTypeLauncher: {
Replicas: newInt32(1),
Expand All @@ -133,8 +139,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
RunPolicy: RunPolicy{
CleanPodPolicy: NewCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
LauncherCreationPolicy: "AtStartup",
MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{
MPIReplicaTypeWorker: {
Replicas: newInt32(0),
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow/v2beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@
"mpiReplicaSpecs"
],
"properties": {
"launcherCreationPolicy": {
"description": "launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup.",
"type": "string"
},
"mpiImplementation": {
"description": "MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\".",
"type": "string"
Expand Down
18 changes: 18 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ type RunPolicy struct {
Suspend *bool `json:"suspend,omitempty"`
}

type LauncherCreationPolicy string

const (
// LauncherCreationPolicyAtStartup is default behavior
// when Launcher is started in parallel with workers
LauncherCreationPolicyAtStartup LauncherCreationPolicy = "AtStartup"

// LauncherCreationPolicyWaitForWorkersReady makes Launcher reation
// postponed until all workers are in ready state so that the Launcher
// does not fail trying to connect to worker.
LauncherCreationPolicyWaitForWorkersReady LauncherCreationPolicy = "WaitForWorkersReady"
)

type MPIJobSpec struct {

// Specifies the number of slots per worker used in hostfile.
Expand All @@ -154,6 +167,11 @@ type MPIJobSpec struct {
// +kubebuilder:default:="/root/.ssh"
SSHAuthMountPath string `json:"sshAuthMountPath,omitempty"`

// launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup.
// +kubebuilder:validation:Enum:AtStartup;WaitForWorkersReady
// +kubebuilder:default:=AtStartup
LauncherCreationPolicy LauncherCreationPolicy `json:"launcherCreationPolicy,omitempty"`

// MPIImplementation is the MPI implementation.
// Options are "OpenMPI" (default), "Intel" and "MPICH".
// +kubebuilder:validation:Enum:=OpenMPI;Intel;MPICH
Expand Down
40 changes: 29 additions & 11 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,14 @@ func (c *MPIJobController) syncHandler(key string) error {
}
}
if launcher == nil {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
return fmt.Errorf("creating launcher Pod: %w", err)
if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
return fmt.Errorf("creating launcher Pod: %w", err)
}
} else {
klog.V(4).Infof("Waiting for workers %s/%s to start.", mpiJob.Namespace, mpiJob.Name)
}
}
}
Expand Down Expand Up @@ -776,6 +780,19 @@ func (c *MPIJobController) getRunningWorkerPods(mpiJob *kubeflow.MPIJob) ([]*cor
return podList, nil
}

func (c *MPIJobController) countReadyWorkerPods(workers []*corev1.Pod) int {
ready := 0
for _, pod := range workers {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
ready++
break
}
}
}
return ready
}

// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
// one if it doesn't exist.
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev1.ConfigMap, error) {
Expand Down Expand Up @@ -1011,14 +1028,15 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
mpiJob.Status.StartTime = &now
}
}
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt := countRunningPods(launcherPods)
launcherPodsCnt := 0
if launcher != nil {
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt = countRunningPods(launcherPods)
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeLauncher)
launcherStatus := mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeLauncher]
launcherStatus.Failed = launcher.Status.Failed
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func () {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"

mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
Expand Down Expand Up @@ -283,7 +283,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func () {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"

mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
Expand Down
Loading

0 comments on commit f8d815c

Please sign in to comment.