Skip to content

Commit

Permalink
fix comments for mpi-controller (#1485)
Browse files Browse the repository at this point in the history
fix comments for mpi-controller

fix controller-related comments for mpijob

fix comments for mpi-controller

Co-authored-by: hackerboy01 <penglei031303@gmail.com>
  • Loading branch information
hackerboy01 and hackerboy01 committed Nov 29, 2021
1 parent f894870 commit 71428d5
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 64 deletions.
34 changes: 1 addition & 33 deletions manifests/base/cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,13 @@ rules:
- deployments
verbs:
- "*"
# This is needed for the launcher Role.
# This is needed for the launcher role of the MPI operator.
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
- apiGroups:
- ""
resources:
- endpoints
verbs:
- create
- get
- update
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand All @@ -80,29 +65,12 @@ rules:
resources:
- configmaps
- secrets
- services
- serviceaccounts
verbs:
- create
- list
- watch
- update
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- list
- update
- watch
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- create
- get
- apiGroups:
- scheduling.volcano.sh
resources:
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/mpi/v1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
EnvKubeflowNamespace = "KUBEFLOW_NAMESPACE"
// DefaultPortName is name of the port used to communicate between Master and Workers.
DefaultPortName = "mpi-port"
// DefaultContainerName is the name of the XGBoostJob container.
// DefaultContainerName is the name of the MPIJob container.
DefaultContainerName = "mpi"
// DefaultPort is default value of the port.
DefaultPort = 9999
Expand Down
20 changes: 7 additions & 13 deletions pkg/apis/mpi/validation/validation.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Kubeflow Authors
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,31 +41,25 @@ func ValidateV1MpiJobSpec(c *mpiv1.MPIJobSpec) error {
break
}
}

if !isValidReplicaType {
return fmt.Errorf("MPIReplicaType is %v but must be one of %v", rType, validReplicaTypes)
}

//Make sure the image is defined in the container
//defaultContainerPresent := false
for _, container := range value.Template.Spec.Containers {
if container.Image == "" {
msg := fmt.Sprintf("MPIReplicaSpec is not valid: Image is undefined in the container of %v", rType)
return fmt.Errorf(msg)
}
// if container.Name == mpiv1.DefaultContainerName {
// defaultContainerPresent = true
// }

if container.Name == "" {
msg := fmt.Sprintf("MPIReplicaSpec is not valid: ImageName is undefined in the container of %v", rType)
return fmt.Errorf(msg)
}
}
//Make sure there has at least one container named "mpi"
// if !defaultContainerPresent {
// msg := fmt.Sprintf("MPIReplicaSpec is not valid: There is no container named %s in %v", mpiv1.DefaultContainerName, rType)
// return fmt.Errorf(msg)
// }
if rType == mpiv1.MPIReplicaTypeLauncher {
launcherExists = true
if value.Replicas != nil && int(*value.Replicas) != 1 {
return fmt.Errorf("MPIReplicaSpec is not valid: There must be only 1 master replica")
return fmt.Errorf("MPIReplicaSpec is not valid: There must be only 1 launcher replica")
}
}

Expand Down
97 changes: 97 additions & 0 deletions pkg/apis/mpi/validation/validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package validation

import (
"testing"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
mpiv1 "github.com/kubeflow/training-operator/pkg/apis/mpi/v1"

v1 "k8s.io/api/core/v1"
)

func TestValidateV1MpiJobSpec(t *testing.T) {
testCases := []mpiv1.MPIJobSpec{
{
MPIReplicaSpecs: nil,
},
{
MPIReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
mpiv1.MPIReplicaTypeLauncher: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
},
},
},
},
},
{
MPIReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
mpiv1.MPIReplicaTypeLauncher: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Image: "",
},
},
},
},
},
},
},
{
MPIReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
mpiv1.MPIReplicaTypeLauncher: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "",
Image: "kubeflow/tf-dist-mnist-test:1.0",
},
},
},
},
},
},
},
{
MPIReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
mpiv1.MPIReplicaTypeLauncher: &commonv1.ReplicaSpec{
Replicas: mpiv1.Int32(2),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "tensorflow",
Image: "kubeflow/tf-dist-mnist-test:1.0",
},
},
},
},
},
},
},
}
for _, c := range testCases {
err := ValidateV1MpiJobSpec(&c)
if err == nil {
t.Error("Failed validate the v1.MpiJobSpec")
}
}
}
18 changes: 1 addition & 17 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,6 @@ func (r *MPIJobReconciler) ReconcilePods(
return err
}

// Get the PodGroup for this MPIJob
// if c.gangSchedulerName != "" {
// if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas+1); podgroup == nil || err != nil {
// return err
// }
// }

worker, err = r.getOrCreateWorker(mpiJob)
if err != nil {
return err
Expand All @@ -393,7 +386,6 @@ func (r *MPIJobReconciler) ReconcilePods(
}

func (r *MPIJobReconciler) updateMPIJobStatus(mpiJob *mpiv1.MPIJob, launcher *corev1.Pod, worker []*corev1.Pod) error {
//oldStatus := mpiJob.Status.DeepCopy()
if launcher != nil {
initializeMPIJobStatuses(mpiJob, mpiv1.MPIReplicaTypeLauncher)
if isPodSucceeded(launcher) {
Expand Down Expand Up @@ -439,7 +431,6 @@ func (r *MPIJobReconciler) updateMPIJobStatus(mpiJob *mpiv1.MPIJob, launcher *co
)

initializeMPIJobStatuses(mpiJob, mpiv1.MPIReplicaTypeWorker)
//spec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
for i := 0; i < len(worker); i++ {
switch worker[i].Status.Phase {
case corev1.PodFailed:
Expand Down Expand Up @@ -470,11 +461,6 @@ func (r *MPIJobReconciler) updateMPIJobStatus(mpiJob *mpiv1.MPIJob, launcher *co
}
r.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
}

// no need to update the mpijob if the status hasn't changed since last time.
// if !reflect.DeepEqual(*oldStatus, mpiJob.Status) {
// return r.UpdateJobStatusInApiServer(mpiJob, mpiJob.Status)
// }
return nil
}

Expand Down Expand Up @@ -593,7 +579,7 @@ func (r *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv
return err
}
}
// when master is succeed, the job is finished.
// when launcher is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name)
logrus.Info(msg)
Expand Down Expand Up @@ -714,8 +700,6 @@ func (r *MPIJobReconciler) getOrCreateConfigMap(mpiJob *mpiv1.MPIJob, workerRepl
}
updateDiscoverHostsInConfigMap(newCM, mpiJob, podList, isGPULauncher)

//cm, err := r.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix)

cm := &corev1.ConfigMap{}
NamespacedName := types.NamespacedName{Namespace: mpiJob.Namespace, Name: mpiJob.Name + configSuffix}
err = r.Get(context.Background(), NamespacedName, cm)
Expand Down

0 comments on commit 71428d5

Please sign in to comment.