Skip to content

Commit

Permalink
Cherry pick commits (#1757)
Browse files Browse the repository at this point in the history
* Add validation for verifying that the CustomJob (e.g., TFJob) name meets DNS1035 (#1748)

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>

* Fix the success condition of the job in PyTorchJob's Elastic mode. (#1752)

Signed-off-by: Syulin7 <735122171@qq.com>

* Set the default value of CleanPodPolicy to None (#1754)

Signed-off-by: Syulin7 <735122171@qq.com>

* Update mpijob_controller.go (#1755)

fix typo TFJob, should be MPIJob

---------

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
Signed-off-by: Syulin7 <735122171@qq.com>
Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
Co-authored-by: yu lin <37265556+Syulin7@users.noreply.github.com>
Co-authored-by: Yasser Shalabi <yassershalabi@gmail.com>
  • Loading branch information
4 people committed Feb 13, 2023
1 parent b8004ae commit 5a5f92d
Show file tree
Hide file tree
Showing 23 changed files with 840 additions and 269 deletions.
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow.org/v1/defaulting_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ func setTypeNameToCamelCase(replicaSpecs map[commonv1.ReplicaType]*commonv1.Repl
}
}
}

func cleanPodPolicyPointer(cleanPodPolicy commonv1.CleanPodPolicy) *commonv1.CleanPodPolicy {
return &cleanPodPolicy
}
4 changes: 2 additions & 2 deletions pkg/apis/kubeflow.org/v1/mxnet_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func setMXNetTypeNamesToCamelCase(mxJob *MXJob) {

// SetDefaults_MXJob sets any unspecified values to defaults.
func SetDefaults_MXJob(mxjob *MXJob) {
// Set default cleanpod policy to All.
// Set default cleanpod policy to None.
if mxjob.Spec.RunPolicy.CleanPodPolicy == nil {
all := commonv1.CleanPodPolicyAll
all := commonv1.CleanPodPolicyNone
mxjob.Spec.RunPolicy.CleanPodPolicy = &all
}

Expand Down
35 changes: 30 additions & 5 deletions pkg/apis/kubeflow.org/v1/mxnet_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestSetDefaults_MXJob(t *testing.T) {
},
},
},
expected: expectedMXNetJob(commonv1.CleanPodPolicyAll, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, MXJobDefaultPort),
expected: expectedMXNetJob(commonv1.CleanPodPolicyNone, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, MXJobDefaultPort),
},
"Set spec with restart policy": {
original: &MXJob{
Expand All @@ -118,7 +118,7 @@ func TestSetDefaults_MXJob(t *testing.T) {
},
},
},
expected: expectedMXNetJob(commonv1.CleanPodPolicyAll, commonv1.RestartPolicyOnFailure, 1, MXJobDefaultPortName, MXJobDefaultPort),
expected: expectedMXNetJob(commonv1.CleanPodPolicyNone, commonv1.RestartPolicyOnFailure, 1, MXJobDefaultPortName, MXJobDefaultPort),
},
"Set spec with replicas": {
original: &MXJob{
Expand All @@ -140,7 +140,7 @@ func TestSetDefaults_MXJob(t *testing.T) {
},
},
},
expected: expectedMXNetJob(commonv1.CleanPodPolicyAll, MXJobDefaultRestartPolicy, 3, MXJobDefaultPortName, MXJobDefaultPort),
expected: expectedMXNetJob(commonv1.CleanPodPolicyNone, MXJobDefaultRestartPolicy, 3, MXJobDefaultPortName, MXJobDefaultPort),
},

"Set spec with default node port name and port": {
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestSetDefaults_MXJob(t *testing.T) {
},
},
},
expected: expectedMXNetJob(commonv1.CleanPodPolicyAll, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, MXJobDefaultPort),
expected: expectedMXNetJob(commonv1.CleanPodPolicyNone, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, MXJobDefaultPort),
},

"Set spec with node port": {
Expand Down Expand Up @@ -196,7 +196,32 @@ func TestSetDefaults_MXJob(t *testing.T) {
},
},
},
expected: expectedMXNetJob(commonv1.CleanPodPolicyAll, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, 9999),
expected: expectedMXNetJob(commonv1.CleanPodPolicyNone, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, 9999),
},

"set spec with cleanpod policy": {
original: &MXJob{
Spec: MXJobSpec{
RunPolicy: commonv1.RunPolicy{
CleanPodPolicy: cleanPodPolicyPointer(commonv1.CleanPodPolicyAll),
},
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: MXJobDefaultContainerName,
Image: testImage,
},
},
},
},
},
},
},
},
expected: expectedMXNetJob(commonv1.CleanPodPolicyAll, MXJobDefaultRestartPolicy, 1, MXJobDefaultPortName, MXJobDefaultPort),
},
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/apis/kubeflow.org/v1/mxnet_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,29 @@ package v1

import (
"fmt"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
log "github.com/sirupsen/logrus"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
)

// ValidateV1MXJobSpec checks that the kubeflowv1.MXJobSpec is valid.
func ValidateV1MXJobSpec(c *MXJobSpec) error {
return validateMXNetReplicaSpecs(c.MXReplicaSpecs)
// ValidateV1MXJob checks that the kubeflowv1.MXJobSpec is valid.
func ValidateV1MXJob(mxJob *MXJob) error {
if errors := apimachineryvalidation.NameIsDNS1035Label(mxJob.ObjectMeta.Name, false); errors != nil {
return fmt.Errorf("MXJob name is invalid: %v", errors)
}
if err := validateMXReplicaSpecs(mxJob.Spec.MXReplicaSpecs); err != nil {
return err
}
return nil
}

// IsScheduler returns true if the type is Scheduler.
func IsScheduler(typ commonv1.ReplicaType) bool {
return typ == MXJobReplicaTypeScheduler
}

func validateMXNetReplicaSpecs(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error {
func validateMXReplicaSpecs(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error {
if specs == nil {
return fmt.Errorf("MXJobSpec is not valid")
}
Expand Down
184 changes: 144 additions & 40 deletions pkg/apis/kubeflow.org/v1/mxnet_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,176 @@ import (
"testing"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
)

func TestValidateV1MXJobSpec(t *testing.T) {
testCases := []MXJobSpec{
{
MXReplicaSpecs: nil,
func TestValidateV1MXJob(t *testing.T) {
validMXReplicaSpecs := map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeScheduler: {
Replicas: pointer.Int32(1),
RestartPolicy: commonv1.RestartPolicyNever,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "mxnet",
Image: "mxjob/mxnet",
}},
},
},
},
MXJobReplicaTypeServer: {
Replicas: pointer.Int32(1),
RestartPolicy: commonv1.RestartPolicyNever,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "mxnet",
Image: "mxjob/mxnet",
}},
},
},
},
{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
MXJobReplicaTypeWorker: {
Replicas: pointer.Int32(1),
RestartPolicy: commonv1.RestartPolicyNever,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "mxnet",
Image: "mxjob/mxnet",
Command: []string{"python"},
Args: []string{
"/incubator-mxnet/example/image-classification/train_mnist.py",
"--num-epochs=10",
"--num-layers=2",
"--kv-store=dist_device_sync",
},
},
}},
},
},
},
}

testCases := map[string]struct {
MXJob *MXJob
wantErr bool
}{
"valid mxJob": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: MXJobSpec{
MXReplicaSpecs: validMXReplicaSpecs,
},
},
wantErr: false,
},
"mxReplicaSpecs is nil": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
},
wantErr: true,
},
{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Image: "",
"mxJob name does not meet DNS1035": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "10test",
},
Spec: MXJobSpec{
MXReplicaSpecs: validMXReplicaSpecs,
},
},
wantErr: true,
},
"no containers": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: MXJobSpec{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{},
},
},
},
},
},
},
wantErr: true,
},
{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "",
Image: "mxjob/mxnet:gpu",
"image is empty": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: MXJobSpec{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "mxnet",
Image: "",
}},
},
},
},
},
},
},
wantErr: true,
},
{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeScheduler: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
"mxnet default container name doesn't find": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: MXJobSpec{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeWorker: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "",
Image: "mxjob/mxnet:gpu",
}},
},
},
},
},
},
},
wantErr: true,
},
"replicaSpec is nil": {
MXJob: &MXJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: MXJobSpec{
MXReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
MXJobReplicaTypeScheduler: nil,
},
},
},
wantErr: true,
},
}
for _, c := range testCases {
err := ValidateV1MXJobSpec(&c)
if err.Error() != "MXJobSpec is not valid" {
t.Error("Failed validate the alpha2.MXJobSpec")
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
got := ValidateV1MXJob(tc.MXJob)
if (got != nil) != tc.wantErr {
t.Fatalf("ValidateV1MXJob() error = %v, wantErr %v", got, tc.wantErr)
}
})
}
}
18 changes: 14 additions & 4 deletions pkg/apis/kubeflow.org/v1/paddlepaddle_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@ import (
"fmt"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
)

func ValidateV1PaddleJobSpec(c *PaddleJobSpec) error {
if c.PaddleReplicaSpecs == nil {
func ValidateV1PaddleJob(paddleJob *PaddleJob) error {
if errors := apimachineryvalidation.NameIsDNS1035Label(paddleJob.ObjectMeta.Name, false); errors != nil {
return fmt.Errorf("PaddleJob name is invalid: %v", errors)
}
if err := validatePaddleReplicaSpecs(paddleJob.Spec.PaddleReplicaSpecs); err != nil {
return err
}
return nil
}

func validatePaddleReplicaSpecs(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error {
if specs == nil {
return fmt.Errorf("PaddleJobSpec is not valid")
}
for rType, value := range c.PaddleReplicaSpecs {
for rType, value := range specs {
if value == nil || len(value.Template.Spec.Containers) == 0 {
return fmt.Errorf("PaddleJobSpec is not valid: containers definition expected in %v", rType)
}
Expand Down Expand Up @@ -63,5 +74,4 @@ func ValidateV1PaddleJobSpec(c *PaddleJobSpec) error {
}

return nil

}
Loading

0 comments on commit 5a5f92d

Please sign in to comment.