Skip to content

Commit

Permalink
Reproduce missing the nodeSelector issue on the PyTorchJob
Browse files Browse the repository at this point in the history
Signed-off-by: tenzen-y <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Dec 8, 2023
1 parent 47ebc56 commit 2b1ba5d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
9 changes: 7 additions & 2 deletions hack/e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ function kind_load {
}

function kueue_deploy {
(cd config/components/manager && $KUSTOMIZE edit set image controller=$IMAGE_TAG)
kubectl apply --server-side -k test/e2e/config
kubectl apply -k github.com/kubeflow/training-operator/manifests/base/crds?ref=master
# (cd config/components/manager && $KUSTOMIZE edit set image controller=$IMAGE_TAG)
# kubectl apply --server-side -k test/e2e/config
kubectl apply --server-side -k config/default
kubectl wait -n kueue-system --for=condition=available deployment/kueue-controller-manager --timeout=120s
sleep 30
kubectl apply -f site/static/examples/jobs/sample-pytorchjob.yaml
}

trap cleanup EXIT
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper {
}}
}

func (j *PyTorchJobWrapper) Image(img string, args []string) *PyTorchJobWrapper {
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image = img
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Args = args
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image = img
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Args = args
return j
}

// PriorityClass updates job priorityclass.
func (j *PyTorchJobWrapper) PriorityClass(pc string) *PyTorchJobWrapper {
if j.Spec.RunPolicy.SchedulingPolicy == nil {
Expand Down
40 changes: 39 additions & 1 deletion test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -31,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
Expand Down Expand Up @@ -686,6 +686,32 @@ var _ = ginkgo.Describe("Kueue", func() {
}, util.LongTimeout, util.Interval).Should(gomega.BeTrue())
})

ginkgo.FIt("Should unsuspend a PyTorchJob and set nodeSelectors", func() {
// Use a binary that ends.
//sampleJob := testingpytorchjob.MakePyTorchJob("test-job", ns.Name).
// Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"2s"}).
// Queue(localQueue.Name).
// Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "0.3").
// Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "0.3").
// Obj()
//gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed())

//time.Sleep(time.Minute * 100)
createdWorkload := &kueue.Workload{}
jobKey = types.NamespacedName{Name: "pytorch-simple", Namespace: "default"}
expectPyTorchJobUnsuspendedWithNodeSelectors(jobKey, map[string]string{
"instance-type": "on-demand",
})
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobKey.Name), Namespace: ns.Name}
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return workload.HasQuotaReservation(createdWorkload) &&
apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadFinished)
}, util.LongTimeout, util.Interval).Should(gomega.BeTrue())
})

ginkgo.It("Should readmit preempted job with priorityClass into a separate flavor", func() {
gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed())

Expand Down Expand Up @@ -974,6 +1000,18 @@ func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[stri
}, util.Timeout, util.Interval).Should(gomega.Equal([]any{false, ns}))
}

func expectPyTorchJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) {
job := &kftraining.PyTorchJob{}
gomega.EventuallyWithOffset(1, func() []any {
gomega.Expect(k8sClient.Get(ctx, key, job)).To(gomega.Succeed())
return []any{
*job.Spec.RunPolicy.Suspend,
job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.NodeSelector,
job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.NodeSelector,
}
}, util.Timeout, util.Interval).Should(gomega.Equal([]any{false, ns, ns}))
}

func defaultOwnerReferenceForJob(name string) []metav1.OwnerReference {
return []metav1.OwnerReference{
{
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"context"
"fmt"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"os"
"testing"
"time"
Expand Down Expand Up @@ -74,6 +75,9 @@ func CreateClientUsingCluster() client.Client {
err = kueue.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kftraining.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = visibility.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

Expand Down

0 comments on commit 2b1ba5d

Please sign in to comment.