From 15948bcf55faf05ca02bc30af6368c0ab7728a82 Mon Sep 17 00:00:00 2001 From: huyuanzhe Date: Wed, 12 Oct 2022 19:38:42 +0800 Subject: [PATCH 1/6] [ray job] support stop job after cr is deleted in cluster selector mode --- .../controllers/ray/rayjob_controller.go | 30 ++++++++++++++++ .../ray/utils/dashboard_httpclient.go | 34 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 05a0881fdd..8aefaf778f 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -3,6 +3,8 @@ package ray import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" "time" "k8s.io/client-go/tools/record" @@ -255,6 +257,34 @@ func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&rayv1alpha1.RayJob{}). Owns(&rayv1alpha1.RayCluster{}). Owns(&corev1.Service{}). + WithEventFilter(predicate.Funcs{ + DeleteFunc: func(e event.DeleteEvent) bool { + r.Log.Info("event to delete", "event", e) + + job, ok := e.Object.(*rayv1alpha1.RayJob) + if !ok { + r.Log.Info("failed to get job object from event", "event", e) + return false + } + + if job.Status.JobStatus == rayv1alpha1.JobStatusRunning || job.Status.JobStatus == rayv1alpha1.JobStatusPending { + if job.Status.DashboardURL == "" || job.Status.JobId == "" { + r.Log.Info("dashboardURL or job_id is empty", "job", job) + return false + } + rayDashboardClient := utils.GetRayDashboardClientFunc() + rayDashboardClient.InitClient(job.Status.DashboardURL) + err := rayDashboardClient.StopJob(job.Status.JobId, &r.Log) + if err != nil { + r.Log.Info("failed to stop job", "error", err) + } + } + // The reconciler adds a finalizer so we perform clean-up + // when the delete timestamp is added + // Suppress Delete events to avoid filtering them out in the Reconcile function + return false + }, + }). Complete(r) } diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboard_httpclient.go index 5ecadac82a..9ae2ccb7c9 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient.go @@ -81,6 +81,7 @@ type RayDashboardClientInterface interface { ConvertServeConfig(specs []rayv1alpha1.ServeConfigSpec) []ServeConfigSpec GetJobInfo(jobId string) (*RayJobInfo, error) SubmitJob(rayJob *rayv1alpha1.RayJob, log *logr.Logger) (jobId string, err error) + StopJob(jobName string, log *logr.Logger) (err error) } // GetRayDashboardClientFunc Used for unit tests. @@ -315,6 +316,10 @@ type RayJobResponse struct { JobId string `json:"job_id"` } +type RayJobStopResponse struct { + Stopped bool `json:"stopped"` +} + func (r *RayDashboardClient) GetJobInfo(jobId string) (*RayJobInfo, error) { req, err := http.NewRequest("GET", r.dashboardURL+JobPath+jobId, nil) if err != nil { @@ -377,6 +382,35 @@ func (r *RayDashboardClient) SubmitJob(rayJob *rayv1alpha1.RayJob, log *logr.Log return jobResp.JobId, nil } +func (r *RayDashboardClient) StopJob(jobName string, log *logr.Logger) (err error) { + log.Info("Stop a ray job", "rayJob", jobName) + + req, err := http.NewRequest(http.MethodPost, r.dashboardURL+JobPath + jobName + "/stop", nil) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + resp, err := r.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, _ := ioutil.ReadAll(resp.Body) + + var jobStopResp RayJobStopResponse + if err = json.Unmarshal(body, &jobStopResp); err != nil { + return err + } + + if !jobStopResp.Stopped { + return fmt.Errorf("failed to stopped job: %v", jobName) + } + return nil +} + + func ConvertRayJobToReq(rayJob *rayv1alpha1.RayJob) (*RayJobRequest, error) { req := &RayJobRequest{ Entrypoint: rayJob.Spec.Entrypoint, From 237f91e99031c3aed93e23551cbe5f1a5410c3e9 Mon Sep 17 00:00:00 2001 From: huyuanzhe Date: Wed, 12 Oct 2022 20:46:44 +0800 Subject: [PATCH 2/6] UT --- ray-operator/controllers/ray/rayjob_controller.go | 3 ++- ray-operator/controllers/ray/utils/dashboard_httpclient.go | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 8aefaf778f..cf25844872 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -3,9 +3,10 @@ package ray import ( "context" "fmt" + "time" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" - "time" "k8s.io/client-go/tools/record" diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboard_httpclient.go index 9ae2ccb7c9..199c97c0de 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient.go @@ -385,7 +385,7 @@ func (r *RayDashboardClient) SubmitJob(rayJob *rayv1alpha1.RayJob, log *logr.Log func (r *RayDashboardClient) StopJob(jobName string, log *logr.Logger) (err error) { log.Info("Stop a ray job", "rayJob", jobName) - req, err := http.NewRequest(http.MethodPost, r.dashboardURL+JobPath + jobName + "/stop", nil) + req, err := http.NewRequest(http.MethodPost, r.dashboardURL+JobPath+jobName+"/stop", nil) if err != nil { return err } @@ -410,7 +410,6 @@ func (r *RayDashboardClient) StopJob(jobName string, log *logr.Logger) (err erro return nil } - func ConvertRayJobToReq(rayJob *rayv1alpha1.RayJob) (*RayJobRequest, error) { req := &RayJobRequest{ Entrypoint: rayJob.Spec.Entrypoint, From 3857aac7e05b867422cba1a10d9b6631775dda28 Mon Sep 17 00:00:00 2001 From: huyuanzhe Date: Tue, 18 Oct 2022 19:23:26 +0800 Subject: [PATCH 3/6] fix for UT --- .../controllers/ray/rayjob_controller.go | 54 ++++++++++--------- .../ray/utils/dashboard_httpclient_test.go | 17 ++++++ .../ray/utils/fake_serve_httpclient.go | 4 ++ 3 files changed, 49 insertions(+), 26 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index cf25844872..030bc21468 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -259,36 +259,38 @@ func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&rayv1alpha1.RayCluster{}). Owns(&corev1.Service{}). WithEventFilter(predicate.Funcs{ - DeleteFunc: func(e event.DeleteEvent) bool { - r.Log.Info("event to delete", "event", e) - - job, ok := e.Object.(*rayv1alpha1.RayJob) - if !ok { - r.Log.Info("failed to get job object from event", "event", e) - return false - } - - if job.Status.JobStatus == rayv1alpha1.JobStatusRunning || job.Status.JobStatus == rayv1alpha1.JobStatusPending { - if job.Status.DashboardURL == "" || job.Status.JobId == "" { - r.Log.Info("dashboardURL or job_id is empty", "job", job) - return false - } - rayDashboardClient := utils.GetRayDashboardClientFunc() - rayDashboardClient.InitClient(job.Status.DashboardURL) - err := rayDashboardClient.StopJob(job.Status.JobId, &r.Log) - if err != nil { - r.Log.Info("failed to stop job", "error", err) - } - } - // The reconciler adds a finalizer so we perform clean-up - // when the delete timestamp is added - // Suppress Delete events to avoid filtering them out in the Reconcile function - return false - }, + DeleteFunc: r.DeleteEventFilter, }). Complete(r) } +func (r *RayJobReconciler) DeleteEventFilter(e event.DeleteEvent) bool { + r.Log.Info("event to delete", "event", e) + + job, ok := e.Object.(*rayv1alpha1.RayJob) + if !ok { + r.Log.Info("failed to get job object from event", "event", e) + return false + } + + if job.Status.JobStatus == rayv1alpha1.JobStatusRunning || job.Status.JobStatus == rayv1alpha1.JobStatusPending { + if job.Status.DashboardURL == "" || job.Status.JobId == "" { + r.Log.Info("dashboardURL or job_id is empty", "job", job) + return false + } + rayDashboardClient := utils.GetRayDashboardClientFunc() + rayDashboardClient.InitClient(job.Status.DashboardURL) + err := rayDashboardClient.StopJob(job.Status.JobId, &r.Log) + if err != nil { + r.Log.Info("failed to stop job", "error", err) + } + } + // The reconciler adds a finalizer so we perform clean-up + // when the delete timestamp is added + // Suppress Delete events to avoid filtering them out in the Reconcile function + return false +} + func (r *RayJobReconciler) getRayJobInstance(ctx context.Context, request ctrl.Request) (*rayv1alpha1.RayJob, error) { rayJobInstance := &rayv1alpha1.RayJob{} if err := r.Get(ctx, request.NamespacedName, rayJobInstance); err != nil { diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go index d563257298..e0be6489e3 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go @@ -82,4 +82,21 @@ var _ = Describe("RayFrameworkGenerator", func() { Expect(rayJobInfo.Entrypoint).To(Equal(rayJob.Spec.Entrypoint)) Expect(rayJobInfo.JobStatus).To(Equal(rayv1alpha1.JobStatusRunning)) }) + + It("Test stop job", func() { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("POST", rayDashboardClient.dashboardURL+JobPath+"stop-job-1/stop", + func(req *http.Request) (*http.Response, error) { + body := &RayJobStopResponse{ + Stopped: true, + } + bodyBytes, _ := json.Marshal(body) + return httpmock.NewBytesResponse(200, bodyBytes), nil + }) + + err := rayDashboardClient.StopJob("stop-job-1", &ctrl.Log) + Expect(err).To(BeNil()) + }) + }) diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index 5bb6688b8e..07161f0306 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -87,3 +87,7 @@ func (r *FakeRayDashboardClient) GetJobInfo(jobId string) (*RayJobInfo, error) { func (r *FakeRayDashboardClient) SubmitJob(rayJob *rayv1alpha1.RayJob, log *logr.Logger) (jobId string, err error) { return "", nil } + +func (r *FakeRayDashboardClient) StopJob(jobName string, log *logr.Logger) (err error) { + return nil +} From 0159a9498fa9eb4b808072611ce700235c78d5c5 Mon Sep 17 00:00:00 2001 From: huyuanzhe Date: Tue, 18 Oct 2022 19:33:42 +0800 Subject: [PATCH 4/6] fix for UT --- ray-operator/controllers/ray/utils/dashboard_httpclient_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go index e0be6489e3..bc292cbe0e 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go @@ -98,5 +98,4 @@ var _ = Describe("RayFrameworkGenerator", func() { err := rayDashboardClient.StopJob("stop-job-1", &ctrl.Log) Expect(err).To(BeNil()) }) - }) From 849df404144fac51aae58a32de2a10c355171549 Mon Sep 17 00:00:00 2001 From: huyuanzhe Date: Tue, 18 Oct 2022 21:59:11 +0800 Subject: [PATCH 5/6] fix --- .../ray/utils/dashboard_httpclient_test.go | 1 + .../controllers/ray/utils/utils_suite_test.go | 13 +++++++++++++ 2 files changed, 14 insertions(+) create mode 100644 ray-operator/controllers/ray/utils/utils_suite_test.go diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go index bc292cbe0e..5b717034a6 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go @@ -41,6 +41,7 @@ var _ = Describe("RayFrameworkGenerator", func() { RuntimeEnv: encodedRuntimeEnv, }, } + rayDashboardClient = &RayDashboardClient{} rayDashboardClient.InitClient("127.0.0.1:8090") }) diff --git a/ray-operator/controllers/ray/utils/utils_suite_test.go b/ray-operator/controllers/ray/utils/utils_suite_test.go new file mode 100644 index 0000000000..b211ec5185 --- /dev/null +++ b/ray-operator/controllers/ray/utils/utils_suite_test.go @@ -0,0 +1,13 @@ +package utils_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestUtils(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Utils Suite") +} From 172fad50bcabc6f13773ff9295e7cc2fc8b7709e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E5=85=83=E5=93=B2?= Date: Fri, 28 Oct 2022 14:24:11 +0800 Subject: [PATCH 6/6] fix for dashboard UT --- ray-operator/controllers/ray/utils/dashboard_httpclient_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go index 5b717034a6..2c0e86af1a 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient_test.go @@ -63,7 +63,7 @@ var _ = Describe("RayFrameworkGenerator", func() { bodyBytes, _ := json.Marshal(body) return httpmock.NewBytesResponse(200, bodyBytes), nil }) - httpmock.RegisterResponder("Get", rayDashboardClient.dashboardURL+JobPath+expectJobId, + httpmock.RegisterResponder("GET", rayDashboardClient.dashboardURL+JobPath+expectJobId, func(req *http.Request) (*http.Response, error) { body := &RayJobInfo{ JobStatus: rayv1alpha1.JobStatusRunning,