Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/codeflare_sdk.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ src/codeflare_sdk/cluster/config.py
src/codeflare_sdk/cluster/model.py
src/codeflare_sdk/job/__init__.py
src/codeflare_sdk/job/jobs.py
src/codeflare_sdk/job/ray_jobs.py
src/codeflare_sdk/utils/__init__.py
src/codeflare_sdk/utils/generate_cert.py
src/codeflare_sdk/utils/generate_yaml.py
src/codeflare_sdk/utils/kube_api_helpers.py
src/codeflare_sdk/utils/openshift_oauth.py
src/codeflare_sdk/utils/pretty_print.py
149 changes: 149 additions & 0 deletions src/codeflare_sdk/job/ray_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2022 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The ray_jobs sub-module contains methods needed to submit jobs and connect to Ray Clusters that were not created by CodeFlare.
The SDK acts as a wrapper for the Ray Job Submission Client.
"""
from ray.job_submission import JobSubmissionClient
from ray.dashboard.modules.job.pydantic_models import JobDetails
from typing import Iterator, Optional, Dict, Any, Union, List


class RayJobClient:
"""
A class that functions as a wrapper for the Ray Job Submission Client.

parameters:
address -- Either (1) the address of the Ray cluster, or (2) the HTTP address of the dashboard server on the head node, e.g. “http://<head-node-ip>:8265”. In case (1) it must be specified as an address that can be passed to ray.init(),
e.g. a Ray Client address (ray://<head_node_host>:10001), or “auto”, or “localhost:<port>”. If unspecified, will try to connect to a running local Ray cluster. This argument is always overridden by the RAY_ADDRESS environment variable.
create_cluster_if_needed -- Indicates whether the cluster at the specified address needs to already be running. Ray doesn't start a cluster before interacting with jobs, but third-party job managers may do so.
cookies -- Cookies to use when sending requests to the HTTP job server.
metadata -- Arbitrary metadata to store along with all jobs. New metadata specified per job will be merged with the global metadata provided here via a simple dict update.
headers -- Headers to use when sending requests to the HTTP job server, used for cases like authentication to a remote cluster.
verify -- Boolean indication to verify the server's TLS certificate or a path to a file or directory of trusted certificates. Default: True.
"""

def __init__(
self,
address: Optional[str] = None,
create_cluster_if_needed: bool = False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
verify: Optional[Union[str, bool]] = True,
):
self.rayJobClient = JobSubmissionClient(
address=address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
verify=verify,
)

def submit_job(
self,
entrypoint: str,
job_id: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None,
submission_id: Optional[str] = None,
entrypoint_num_cpus: Optional[Union[int, float]] = None,
entrypoint_num_gpus: Optional[Union[int, float]] = None,
entrypoint_resources: Optional[Dict[str, float]] = None,
) -> str:
"""
Method for submitting jobs to a Ray Cluster and returning the job id with entrypoint being a mandatory field.

Parameters:
entrypoint -- The shell command to run for this job.
submission_id -- A unique ID for this job.
runtime_env -- The runtime environment to install and run this job in.
metadata -- Arbitrary data to store along with this job.
job_id -- DEPRECATED. This has been renamed to submission_id
entrypoint_num_cpus -- The quantity of CPU cores to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_num_gpus -- The quantity of GPUs to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_resources -- The quantity of custom resources to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it.
"""
return self.rayJobClient.submit_job(
entrypoint=entrypoint,
job_id=job_id,
runtime_env=runtime_env,
metadata=metadata,
submission_id=submission_id,
entrypoint_num_cpus=entrypoint_num_cpus,
entrypoint_num_gpus=entrypoint_num_gpus,
entrypoint_resources=entrypoint_resources,
)

def delete_job(self, job_id: str) -> (bool, str):
"""
Method for deleting jobs with the job id being a mandatory field.
"""
deletion_status = self.rayJobClient.delete_job(job_id=job_id)

if deletion_status:
message = f"Successfully deleted Job {job_id}"
else:
message = f"Failed to delete Job {job_id}"

return deletion_status, message

def get_address(self) -> str:
"""
Method for getting the address from the RayJobClient
"""
return self.rayJobClient.get_address()

def get_job_info(self, job_id: str):
"""
Method for getting the job info with the job id being a mandatory field.
"""
return self.rayJobClient.get_job_info(job_id=job_id)

def get_job_logs(self, job_id: str) -> str:
"""
Method for getting the job logs with the job id being a mandatory field.
"""
return self.rayJobClient.get_job_logs(job_id=job_id)

def get_job_status(self, job_id: str) -> str:
"""
Method for getting the job's status with the job id being a mandatory field.
"""
return self.rayJobClient.get_job_status(job_id=job_id)

def list_jobs(self) -> List[JobDetails]:
"""
Method for getting a list of current jobs in the Ray Cluster.
"""
return self.rayJobClient.list_jobs()

def stop_job(self, job_id: str) -> (bool, str):
"""
Method for stopping a job with the job id being a mandatory field.
"""
stop_job_status = self.rayJobClient.stop_job(job_id=job_id)
if stop_job_status:
message = f"Successfully stopped Job {job_id}"
else:
message = f"Failed to stop Job, {job_id} could have already completed."
return stop_job_status, message

def tail_job_logs(self, job_id: str) -> Iterator[str]:
"""
Method for getting an iterator that follows the logs of a job with the job id being a mandatory field.
"""
return self.rayJobClient.tail_job_logs(job_id=job_id)
156 changes: 156 additions & 0 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
from unittest.mock import MagicMock
from pytest_mock import MockerFixture
from ray.job_submission import JobSubmissionClient
from codeflare_sdk.job.ray_jobs import RayJobClient

# For mocking openshift client results
fake_res = openshift.Result("fake")
Expand Down Expand Up @@ -2846,6 +2847,161 @@ def test_gen_app_wrapper_with_oauth(mocker: MockerFixture):
)


"""
Ray Jobs tests
"""


# rjc == RayJobClient
@pytest.fixture
def ray_job_client(mocker):
# Creating a fixture to instantiate RayJobClient with a mocked JobSubmissionClient
mocker.patch.object(JobSubmissionClient, "__init__", return_value=None)
return RayJobClient(
"https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org"
)


def test_rjc_submit_job(ray_job_client, mocker):
mocked_submit_job = mocker.patch.object(
JobSubmissionClient, "submit_job", return_value="mocked_submission_id"
)
submission_id = ray_job_client.submit_job(entrypoint={"pip": ["numpy"]})

mocked_submit_job.assert_called_once_with(
entrypoint={"pip": ["numpy"]},
job_id=None,
runtime_env=None,
metadata=None,
submission_id=None,
entrypoint_num_cpus=None,
entrypoint_num_gpus=None,
entrypoint_resources=None,
)

assert submission_id == "mocked_submission_id"


def test_rjc_delete_job(ray_job_client, mocker):
# Case return True
mocked_delete_job_True = mocker.patch.object(
JobSubmissionClient, "delete_job", return_value=True
)
result = ray_job_client.delete_job(job_id="mocked_job_id")

mocked_delete_job_True.assert_called_once_with(job_id="mocked_job_id")
assert result == (True, "Successfully deleted Job mocked_job_id")

# Case return False
mocked_delete_job_False = mocker.patch.object(
JobSubmissionClient, "delete_job", return_value=(False)
)
result = ray_job_client.delete_job(job_id="mocked_job_id")

mocked_delete_job_False.assert_called_once_with(job_id="mocked_job_id")
assert result == (False, "Failed to delete Job mocked_job_id")


def test_rjc_stop_job(ray_job_client, mocker):
# Case return True
mocked_stop_job_True = mocker.patch.object(
JobSubmissionClient, "stop_job", return_value=(True)
)
result = ray_job_client.stop_job(job_id="mocked_job_id")

mocked_stop_job_True.assert_called_once_with(job_id="mocked_job_id")
assert result == (True, "Successfully stopped Job mocked_job_id")

# Case return False
mocked_stop_job_False = mocker.patch.object(
JobSubmissionClient, "stop_job", return_value=(False)
)
result = ray_job_client.stop_job(job_id="mocked_job_id")

mocked_stop_job_False.assert_called_once_with(job_id="mocked_job_id")
assert result == (
False,
"Failed to stop Job, mocked_job_id could have already completed.",
)


def test_rjc_address(ray_job_client, mocker):
mocked_rjc_address = mocker.patch.object(
JobSubmissionClient,
"get_address",
return_value="https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org",
)
address = ray_job_client.get_address()

mocked_rjc_address.assert_called_once()
assert (
address
== "https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org"
)


def test_rjc_get_job_logs(ray_job_client, mocker):
mocked_rjc_get_job_logs = mocker.patch.object(
JobSubmissionClient, "get_job_logs", return_value="Logs"
)
logs = ray_job_client.get_job_logs(job_id="mocked_job_id")

mocked_rjc_get_job_logs.assert_called_once_with(job_id="mocked_job_id")
assert logs == "Logs"


def test_rjc_get_job_info(ray_job_client, mocker):
job_details_example = "JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id=None, submission_id='mocked_submission_id', driver_info=None, status=<JobStatus.PENDING: 'PENDING'>, entrypoint='python test.py', message='Job has not started yet. It may be waiting for the runtime environment to be set up.', error_type=None, start_time=1701271760641, end_time=None, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_67de6f0e60d43b19.zip', 'pip': {'packages': ['numpy'], 'pip_check': False}, '_ray_commit': 'b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc'}, driver_agent_http_address=None, driver_node_id=None)"
mocked_rjc_get_job_info = mocker.patch.object(
JobSubmissionClient, "get_job_info", return_value=job_details_example
)
job_details = ray_job_client.get_job_info(job_id="mocked_job_id")

mocked_rjc_get_job_info.assert_called_once_with(job_id="mocked_job_id")
assert job_details == job_details_example


def test_rjc_get_job_status(ray_job_client, mocker):
job_status_example = "<JobStatus.PENDING: 'PENDING'>"
mocked_rjc_get_job_status = mocker.patch.object(
JobSubmissionClient, "get_job_status", return_value=job_status_example
)
job_status = ray_job_client.get_job_status(job_id="mocked_job_id")

mocked_rjc_get_job_status.assert_called_once_with(job_id="mocked_job_id")
assert job_status == job_status_example


def test_rjc_tail_job_logs(ray_job_client, mocker):
logs_example = [
"Job started...",
"Processing input data...",
"Finalizing results...",
"Job completed successfully.",
]
mocked_rjc_tail_job_logs = mocker.patch.object(
JobSubmissionClient, "tail_job_logs", return_value=logs_example
)
job_tail_job_logs = ray_job_client.tail_job_logs(job_id="mocked_job_id")

mocked_rjc_tail_job_logs.assert_called_once_with(job_id="mocked_job_id")
assert job_tail_job_logs == logs_example


def test_rjc_list_jobs(ray_job_client, mocker):
jobs_list = [
"JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id=None, submission_id='raysubmit_4k2NYS1YbRXYPZCM', driver_info=None, status=<JobStatus.SUCCEEDED: 'SUCCEEDED'>, entrypoint='python mnist.py', message='Job finished successfully.', error_type=None, start_time=1701352132585, end_time=1701352192002, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_6200b93a110e8033.zip', 'pip': {'packages': ['pytorch_lightning==1.5.10', 'ray_lightning', 'torchmetrics==0.9.1', 'torchvision==0.12.0'], 'pip_check': False}, '_ray_commit': 'b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc'}, driver_agent_http_address='http://10.131.0.18:52365', driver_node_id='9fb515995f5fb13ad4db239ceea378333bebf0a2d45b6aa09d02e691')",
"JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id=None, submission_id='raysubmit_iRuwU8vdkbUZZGvT', driver_info=None, status=<JobStatus.STOPPED: 'STOPPED'>, entrypoint='python mnist.py', message='Job was intentionally stopped.', error_type=None, start_time=1701353096163, end_time=1701353097733, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_6200b93a110e8033.zip', 'pip': {'packages': ['pytorch_lightning==1.5.10', 'ray_lightning', 'torchmetrics==0.9.1', 'torchvision==0.12.0'], 'pip_check': False}, '_ray_commit': 'b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc'}, driver_agent_http_address='http://10.131.0.18:52365', driver_node_id='9fb515995f5fb13ad4db239ceea378333bebf0a2d45b6aa09d02e691')",
]
mocked_rjc_list_jobs = mocker.patch.object(
JobSubmissionClient, "list_jobs", return_value=jobs_list
)
job_list_jobs = ray_job_client.list_jobs()

mocked_rjc_list_jobs.assert_called_once()
assert job_list_jobs == jobs_list


# Make sure to always keep this function last
def test_cleanup():
os.remove(f"{aw_dir}unit-test-cluster.yaml")
Expand Down