diff --git a/src/codeflare_sdk.egg-info/SOURCES.txt b/src/codeflare_sdk.egg-info/SOURCES.txt index cfea1dbf..d922d0db 100644 --- a/src/codeflare_sdk.egg-info/SOURCES.txt +++ b/src/codeflare_sdk.egg-info/SOURCES.txt @@ -14,8 +14,10 @@ src/codeflare_sdk/cluster/config.py src/codeflare_sdk/cluster/model.py src/codeflare_sdk/job/__init__.py src/codeflare_sdk/job/jobs.py +src/codeflare_sdk/job/ray_jobs.py src/codeflare_sdk/utils/__init__.py src/codeflare_sdk/utils/generate_cert.py src/codeflare_sdk/utils/generate_yaml.py src/codeflare_sdk/utils/kube_api_helpers.py +src/codeflare_sdk/utils/openshift_oauth.py src/codeflare_sdk/utils/pretty_print.py diff --git a/src/codeflare_sdk/job/ray_jobs.py b/src/codeflare_sdk/job/ray_jobs.py new file mode 100644 index 00000000..ff1ebdfe --- /dev/null +++ b/src/codeflare_sdk/job/ray_jobs.py @@ -0,0 +1,149 @@ +# Copyright 2022 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The ray_jobs sub-module contains methods needed to submit jobs and connect to Ray Clusters that were not created by CodeFlare. +The SDK acts as a wrapper for the Ray Job Submission Client. +""" +from ray.job_submission import JobSubmissionClient +from ray.dashboard.modules.job.pydantic_models import JobDetails +from typing import Iterator, Optional, Dict, Any, Union, List + + +class RayJobClient: + """ + A class that functions as a wrapper for the Ray Job Submission Client. + + parameters: + address -- Either (1) the address of the Ray cluster, or (2) the HTTP address of the dashboard server on the head node, e.g. “http://:8265”. In case (1) it must be specified as an address that can be passed to ray.init(), + e.g. a Ray Client address (ray://:10001), or “auto”, or “localhost:”. If unspecified, will try to connect to a running local Ray cluster. This argument is always overridden by the RAY_ADDRESS environment variable. + create_cluster_if_needed -- Indicates whether the cluster at the specified address needs to already be running. Ray doesn't start a cluster before interacting with jobs, but third-party job managers may do so. + cookies -- Cookies to use when sending requests to the HTTP job server. + metadata -- Arbitrary metadata to store along with all jobs. New metadata specified per job will be merged with the global metadata provided here via a simple dict update. + headers -- Headers to use when sending requests to the HTTP job server, used for cases like authentication to a remote cluster. + verify -- Boolean indication to verify the server's TLS certificate or a path to a file or directory of trusted certificates. Default: True. + """ + + def __init__( + self, + address: Optional[str] = None, + create_cluster_if_needed: bool = False, + cookies: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + verify: Optional[Union[str, bool]] = True, + ): + self.rayJobClient = JobSubmissionClient( + address=address, + create_cluster_if_needed=create_cluster_if_needed, + cookies=cookies, + metadata=metadata, + headers=headers, + verify=verify, + ) + + def submit_job( + self, + entrypoint: str, + job_id: Optional[str] = None, + runtime_env: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, str]] = None, + submission_id: Optional[str] = None, + entrypoint_num_cpus: Optional[Union[int, float]] = None, + entrypoint_num_gpus: Optional[Union[int, float]] = None, + entrypoint_resources: Optional[Dict[str, float]] = None, + ) -> str: + """ + Method for submitting jobs to a Ray Cluster and returning the job id with entrypoint being a mandatory field. + + Parameters: + entrypoint -- The shell command to run for this job. + submission_id -- A unique ID for this job. + runtime_env -- The runtime environment to install and run this job in. + metadata -- Arbitrary data to store along with this job. + job_id -- DEPRECATED. This has been renamed to submission_id + entrypoint_num_cpus -- The quantity of CPU cores to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0. + entrypoint_num_gpus -- The quantity of GPUs to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0. + entrypoint_resources -- The quantity of custom resources to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. + """ + return self.rayJobClient.submit_job( + entrypoint=entrypoint, + job_id=job_id, + runtime_env=runtime_env, + metadata=metadata, + submission_id=submission_id, + entrypoint_num_cpus=entrypoint_num_cpus, + entrypoint_num_gpus=entrypoint_num_gpus, + entrypoint_resources=entrypoint_resources, + ) + + def delete_job(self, job_id: str) -> (bool, str): + """ + Method for deleting jobs with the job id being a mandatory field. + """ + deletion_status = self.rayJobClient.delete_job(job_id=job_id) + + if deletion_status: + message = f"Successfully deleted Job {job_id}" + else: + message = f"Failed to delete Job {job_id}" + + return deletion_status, message + + def get_address(self) -> str: + """ + Method for getting the address from the RayJobClient + """ + return self.rayJobClient.get_address() + + def get_job_info(self, job_id: str): + """ + Method for getting the job info with the job id being a mandatory field. + """ + return self.rayJobClient.get_job_info(job_id=job_id) + + def get_job_logs(self, job_id: str) -> str: + """ + Method for getting the job logs with the job id being a mandatory field. + """ + return self.rayJobClient.get_job_logs(job_id=job_id) + + def get_job_status(self, job_id: str) -> str: + """ + Method for getting the job's status with the job id being a mandatory field. + """ + return self.rayJobClient.get_job_status(job_id=job_id) + + def list_jobs(self) -> List[JobDetails]: + """ + Method for getting a list of current jobs in the Ray Cluster. + """ + return self.rayJobClient.list_jobs() + + def stop_job(self, job_id: str) -> (bool, str): + """ + Method for stopping a job with the job id being a mandatory field. + """ + stop_job_status = self.rayJobClient.stop_job(job_id=job_id) + if stop_job_status: + message = f"Successfully stopped Job {job_id}" + else: + message = f"Failed to stop Job, {job_id} could have already completed." + return stop_job_status, message + + def tail_job_logs(self, job_id: str) -> Iterator[str]: + """ + Method for getting an iterator that follows the logs of a job with the job id being a mandatory field. + """ + return self.rayJobClient.tail_job_logs(job_id=job_id) diff --git a/tests/unit_test.py b/tests/unit_test.py index bc946538..c33b95ab 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -103,6 +103,7 @@ from unittest.mock import MagicMock from pytest_mock import MockerFixture from ray.job_submission import JobSubmissionClient +from codeflare_sdk.job.ray_jobs import RayJobClient # For mocking openshift client results fake_res = openshift.Result("fake") @@ -2846,6 +2847,161 @@ def test_gen_app_wrapper_with_oauth(mocker: MockerFixture): ) +""" +Ray Jobs tests +""" + + +# rjc == RayJobClient +@pytest.fixture +def ray_job_client(mocker): + # Creating a fixture to instantiate RayJobClient with a mocked JobSubmissionClient + mocker.patch.object(JobSubmissionClient, "__init__", return_value=None) + return RayJobClient( + "https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" + ) + + +def test_rjc_submit_job(ray_job_client, mocker): + mocked_submit_job = mocker.patch.object( + JobSubmissionClient, "submit_job", return_value="mocked_submission_id" + ) + submission_id = ray_job_client.submit_job(entrypoint={"pip": ["numpy"]}) + + mocked_submit_job.assert_called_once_with( + entrypoint={"pip": ["numpy"]}, + job_id=None, + runtime_env=None, + metadata=None, + submission_id=None, + entrypoint_num_cpus=None, + entrypoint_num_gpus=None, + entrypoint_resources=None, + ) + + assert submission_id == "mocked_submission_id" + + +def test_rjc_delete_job(ray_job_client, mocker): + # Case return True + mocked_delete_job_True = mocker.patch.object( + JobSubmissionClient, "delete_job", return_value=True + ) + result = ray_job_client.delete_job(job_id="mocked_job_id") + + mocked_delete_job_True.assert_called_once_with(job_id="mocked_job_id") + assert result == (True, "Successfully deleted Job mocked_job_id") + + # Case return False + mocked_delete_job_False = mocker.patch.object( + JobSubmissionClient, "delete_job", return_value=(False) + ) + result = ray_job_client.delete_job(job_id="mocked_job_id") + + mocked_delete_job_False.assert_called_once_with(job_id="mocked_job_id") + assert result == (False, "Failed to delete Job mocked_job_id") + + +def test_rjc_stop_job(ray_job_client, mocker): + # Case return True + mocked_stop_job_True = mocker.patch.object( + JobSubmissionClient, "stop_job", return_value=(True) + ) + result = ray_job_client.stop_job(job_id="mocked_job_id") + + mocked_stop_job_True.assert_called_once_with(job_id="mocked_job_id") + assert result == (True, "Successfully stopped Job mocked_job_id") + + # Case return False + mocked_stop_job_False = mocker.patch.object( + JobSubmissionClient, "stop_job", return_value=(False) + ) + result = ray_job_client.stop_job(job_id="mocked_job_id") + + mocked_stop_job_False.assert_called_once_with(job_id="mocked_job_id") + assert result == ( + False, + "Failed to stop Job, mocked_job_id could have already completed.", + ) + + +def test_rjc_address(ray_job_client, mocker): + mocked_rjc_address = mocker.patch.object( + JobSubmissionClient, + "get_address", + return_value="https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org", + ) + address = ray_job_client.get_address() + + mocked_rjc_address.assert_called_once() + assert ( + address + == "https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" + ) + + +def test_rjc_get_job_logs(ray_job_client, mocker): + mocked_rjc_get_job_logs = mocker.patch.object( + JobSubmissionClient, "get_job_logs", return_value="Logs" + ) + logs = ray_job_client.get_job_logs(job_id="mocked_job_id") + + mocked_rjc_get_job_logs.assert_called_once_with(job_id="mocked_job_id") + assert logs == "Logs" + + +def test_rjc_get_job_info(ray_job_client, mocker): + job_details_example = "JobDetails(type=, job_id=None, submission_id='mocked_submission_id', driver_info=None, status=, entrypoint='python test.py', message='Job has not started yet. It may be waiting for the runtime environment to be set up.', error_type=None, start_time=1701271760641, end_time=None, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_67de6f0e60d43b19.zip', 'pip': {'packages': ['numpy'], 'pip_check': False}, '_ray_commit': 'b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc'}, driver_agent_http_address=None, driver_node_id=None)" + mocked_rjc_get_job_info = mocker.patch.object( + JobSubmissionClient, "get_job_info", return_value=job_details_example + ) + job_details = ray_job_client.get_job_info(job_id="mocked_job_id") + + mocked_rjc_get_job_info.assert_called_once_with(job_id="mocked_job_id") + assert job_details == job_details_example + + +def test_rjc_get_job_status(ray_job_client, mocker): + job_status_example = "" + mocked_rjc_get_job_status = mocker.patch.object( + JobSubmissionClient, "get_job_status", return_value=job_status_example + ) + job_status = ray_job_client.get_job_status(job_id="mocked_job_id") + + mocked_rjc_get_job_status.assert_called_once_with(job_id="mocked_job_id") + assert job_status == job_status_example + + +def test_rjc_tail_job_logs(ray_job_client, mocker): + logs_example = [ + "Job started...", + "Processing input data...", + "Finalizing results...", + "Job completed successfully.", + ] + mocked_rjc_tail_job_logs = mocker.patch.object( + JobSubmissionClient, "tail_job_logs", return_value=logs_example + ) + job_tail_job_logs = ray_job_client.tail_job_logs(job_id="mocked_job_id") + + mocked_rjc_tail_job_logs.assert_called_once_with(job_id="mocked_job_id") + assert job_tail_job_logs == logs_example + + +def test_rjc_list_jobs(ray_job_client, mocker): + jobs_list = [ + "JobDetails(type=, job_id=None, submission_id='raysubmit_4k2NYS1YbRXYPZCM', driver_info=None, status=, entrypoint='python mnist.py', message='Job finished successfully.', error_type=None, start_time=1701352132585, end_time=1701352192002, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_6200b93a110e8033.zip', 'pip': {'packages': ['pytorch_lightning==1.5.10', 'ray_lightning', 'torchmetrics==0.9.1', 'torchvision==0.12.0'], 'pip_check': False}, '_ray_commit': 'b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc'}, driver_agent_http_address='http://10.131.0.18:52365', driver_node_id='9fb515995f5fb13ad4db239ceea378333bebf0a2d45b6aa09d02e691')", + "JobDetails(type=, job_id=None, submission_id='raysubmit_iRuwU8vdkbUZZGvT', driver_info=None, status=, entrypoint='python mnist.py', message='Job was intentionally stopped.', error_type=None, start_time=1701353096163, end_time=1701353097733, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_6200b93a110e8033.zip', 'pip': {'packages': ['pytorch_lightning==1.5.10', 'ray_lightning', 'torchmetrics==0.9.1', 'torchvision==0.12.0'], 'pip_check': False}, '_ray_commit': 'b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc'}, driver_agent_http_address='http://10.131.0.18:52365', driver_node_id='9fb515995f5fb13ad4db239ceea378333bebf0a2d45b6aa09d02e691')", + ] + mocked_rjc_list_jobs = mocker.patch.object( + JobSubmissionClient, "list_jobs", return_value=jobs_list + ) + job_list_jobs = ray_job_client.list_jobs() + + mocked_rjc_list_jobs.assert_called_once() + assert job_list_jobs == jobs_list + + # Make sure to always keep this function last def test_cleanup(): os.remove(f"{aw_dir}unit-test-cluster.yaml")