From 1f2241e0db6a856f44a8b6715c7c9fe9a8d36901 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Wed, 27 Aug 2025 18:31:33 +0100 Subject: [PATCH] RHOAIENG-30720: Remove GCS FT for Lifecycled RayClusters --- codecov.yml | 1 - src/codeflare_sdk/common/utils/constants.py | 1 + src/codeflare_sdk/ray/rayjobs/config.py | 68 +------------------- src/codeflare_sdk/ray/rayjobs/rayjob.py | 14 ++-- src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 39 ++--------- 5 files changed, 16 insertions(+), 107 deletions(-) diff --git a/codecov.yml b/codecov.yml index fab28aee..4494dcd4 100644 --- a/codecov.yml +++ b/codecov.yml @@ -15,4 +15,3 @@ coverage: default: target: 85% threshold: 2.5% - diff --git a/src/codeflare_sdk/common/utils/constants.py b/src/codeflare_sdk/common/utils/constants.py index 9721ac85..133856cd 100644 --- a/src/codeflare_sdk/common/utils/constants.py +++ b/src/codeflare_sdk/common/utils/constants.py @@ -1,3 +1,4 @@ RAY_VERSION = "2.47.1" # Below references ray:2.47.1-py311-cu121 CUDA_RUNTIME_IMAGE = "quay.io/modh/ray@sha256:6d076aeb38ab3c34a6a2ef0f58dc667089aa15826fa08a73273c629333e12f1e" +MOUNT_PATH = "/home/ray/scripts" diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index 849d7997..35842536 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -40,7 +40,7 @@ ) import logging -from ...common.utils.constants import CUDA_RUNTIME_IMAGE, RAY_VERSION +from ...common.utils.constants import CUDA_RUNTIME_IMAGE, MOUNT_PATH, RAY_VERSION logger = logging.getLogger(__name__) @@ -139,14 +139,6 @@ class ManagedClusterConfig: A list of V1Volume objects to add to the Cluster volume_mounts: A list of V1VolumeMount objects to add to the Cluster - enable_gcs_ft: - A boolean indicating whether to enable GCS fault tolerance. - redis_address: - The address of the Redis server to use for GCS fault tolerance, required when enable_gcs_ft is True. - redis_password_secret: - Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"} - external_storage_namespace: - The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster. """ head_cpu_requests: Union[int, str] = 2 @@ -173,35 +165,10 @@ class ManagedClusterConfig: annotations: Dict[str, str] = field(default_factory=dict) volumes: list[V1Volume] = field(default_factory=list) volume_mounts: list[V1VolumeMount] = field(default_factory=list) - enable_gcs_ft: bool = False - redis_address: Optional[str] = None - redis_password_secret: Optional[Dict[str, str]] = None - external_storage_namespace: Optional[str] = None def __post_init__(self): self.envs["RAY_USAGE_STATS_ENABLED"] = "0" - if self.enable_gcs_ft: - if not self.redis_address: - raise ValueError( - "redis_address must be provided when enable_gcs_ft is True" - ) - - if self.redis_password_secret and not isinstance( - self.redis_password_secret, dict - ): - raise ValueError( - "redis_password_secret must be a dictionary with 'name' and 'key' fields" - ) - - if self.redis_password_secret and ( - "name" not in self.redis_password_secret - or "key" not in self.redis_password_secret - ): - raise ValueError( - "redis_password_secret must contain both 'name' and 'key' fields" - ) - self._validate_types() self._memory_to_string() self._validate_gpu_config(self.head_accelerators) @@ -286,11 +253,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]: "workerGroupSpecs": [self._build_worker_group_spec(cluster_name)], } - # Add GCS fault tolerance if enabled - if self.enable_gcs_ft: - gcs_ft_options = self._build_gcs_ft_options() - ray_cluster_spec["gcsFaultToleranceOptions"] = gcs_ft_options - return ray_cluster_spec def _build_head_group_spec(self) -> Dict[str, Any]: @@ -494,31 +456,7 @@ def _build_env_vars(self) -> list: """Build environment variables list.""" return [V1EnvVar(name=key, value=value) for key, value in self.envs.items()] - def _build_gcs_ft_options(self) -> Dict[str, Any]: - """Build GCS fault tolerance options.""" - gcs_ft_options = {"redisAddress": self.redis_address} - - if ( - hasattr(self, "external_storage_namespace") - and self.external_storage_namespace - ): - gcs_ft_options["externalStorageNamespace"] = self.external_storage_namespace - - if hasattr(self, "redis_password_secret") and self.redis_password_secret: - gcs_ft_options["redisPassword"] = { - "valueFrom": { - "secretKeyRef": { - "name": self.redis_password_secret["name"], - "key": self.redis_password_secret["key"], - } - } - } - - return gcs_ft_options - - def add_script_volumes( - self, configmap_name: str, mount_path: str = "/home/ray/scripts" - ): + def add_script_volumes(self, configmap_name: str, mount_path: str = MOUNT_PATH): """ Add script volume and mount references to cluster configuration. @@ -590,7 +528,7 @@ def build_script_configmap_spec( } def build_script_volume_specs( - self, configmap_name: str, mount_path: str = "/home/ray/scripts" + self, configmap_name: str, mount_path: str = MOUNT_PATH ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ Build volume and mount specifications for scripts diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index acf93fdc..072f5153 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -22,6 +22,7 @@ import re import ast from typing import Dict, Any, Optional, Tuple +from codeflare_sdk.common.utils.constants import MOUNT_PATH from kubernetes import client from ...common.kubernetes_cluster.auth import get_api_client from python_client.kuberay_job_api import RayjobApi @@ -41,8 +42,6 @@ logger = logging.getLogger(__name__) -mount_path = "/home/ray/scripts" - class RayJob: """ @@ -354,7 +353,6 @@ def _extract_script_files_from_entrypoint(self) -> Optional[Dict[str, str]]: return None scripts = {} - # mount_path = "/home/ray/scripts" processed_files = set() # Avoid infinite loops # Look for Python file patterns in entrypoint (e.g., "python script.py", "python /path/to/script.py") @@ -364,14 +362,14 @@ def _extract_script_files_from_entrypoint(self) -> Optional[Dict[str, str]]: # Process main scripts from entrypoint files for script_path in matches: self._process_script_and_imports( - script_path, scripts, mount_path, processed_files + script_path, scripts, MOUNT_PATH, processed_files ) # Update entrypoint paths to use mounted locations for script_path in matches: if script_path in [os.path.basename(s) for s in processed_files]: old_path = script_path - new_path = f"{mount_path}/{os.path.basename(script_path)}" + new_path = f"{MOUNT_PATH}/{os.path.basename(script_path)}" self.entrypoint = self.entrypoint.replace(old_path, new_path) return scripts if scripts else None @@ -466,7 +464,7 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]): # Add volumes to cluster config (config.py handles spec building) self._cluster_config.add_script_volumes( - configmap_name=configmap_name, mount_path="/home/ray/scripts" + configmap_name=configmap_name, mount_path=MOUNT_PATH ) def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]): @@ -541,8 +539,6 @@ def _update_existing_cluster_for_scripts( config_builder: ManagedClusterConfig instance for building specs """ - # Get existing RayCluster - api_instance = client.CustomObjectsApi(get_api_client()) try: ray_cluster = self._cluster_api.get_ray_cluster( name=self.cluster_name, @@ -553,7 +549,7 @@ def _update_existing_cluster_for_scripts( # Build script volume and mount specifications using config.py script_volume, script_mount = config_builder.build_script_volume_specs( - configmap_name=configmap_name, mount_path="/home/ray/scripts" + configmap_name=configmap_name, mount_path=MOUNT_PATH ) # Helper function to check for duplicate volumes/mounts diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index ff7a2639..9b87cec5 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -15,7 +15,7 @@ import pytest import os from unittest.mock import MagicMock, patch -from codeflare_sdk.common.utils.constants import CUDA_RUNTIME_IMAGE, RAY_VERSION +from codeflare_sdk.common.utils.constants import MOUNT_PATH, RAY_VERSION from codeflare_sdk.ray.rayjobs.rayjob import RayJob from codeflare_sdk.ray.cluster.config import ClusterConfiguration @@ -1007,27 +1007,6 @@ def test_rayjob_user_override_shutdown_behavior(mocker): assert rayjob_override_priority.shutdown_after_job_finishes is True -def test_build_ray_cluster_spec_with_gcs_ft(mocker): - """Test build_ray_cluster_spec with GCS fault tolerance enabled.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - - # Create a test cluster config with GCS FT enabled - cluster_config = ManagedClusterConfig( - enable_gcs_ft=True, - redis_address="redis://redis-service:6379", - external_storage_namespace="storage-ns", - ) - - # Build the spec using the method on the cluster config - spec = cluster_config.build_ray_cluster_spec("test-cluster") - - # Verify GCS fault tolerance options - assert "gcsFaultToleranceOptions" in spec - gcs_ft = spec["gcsFaultToleranceOptions"] - assert gcs_ft["redisAddress"] == "redis://redis-service:6379" - assert gcs_ft["externalStorageNamespace"] == "storage-ns" - - class TestRayVersionValidation: """Test Ray version validation in RayJob.""" @@ -1210,7 +1189,7 @@ def test_extract_script_files_from_entrypoint_single_script(mocker, tmp_path): assert scripts is not None assert test_script.name in scripts assert scripts[test_script.name] == "print('Hello World!')" - assert f"/home/ray/scripts/{test_script.name}" in rayjob.entrypoint + assert f"{MOUNT_PATH}/{test_script.name}" in rayjob.entrypoint finally: os.chdir(original_cwd) @@ -1377,7 +1356,7 @@ def test_add_script_volumes(): assert volume.config_map.name == "test-scripts" assert mount.name == "ray-job-scripts" - assert mount.mount_path == "/home/ray/scripts" + assert mount.mount_path == MOUNT_PATH def test_add_script_volumes_duplicate_prevention(): @@ -1619,7 +1598,7 @@ def test_rayjob_submit_with_scripts_new_cluster(mocker, tmp_path): assert len(cluster_config.volume_mounts) == 1 # Verify entrypoint was updated - assert "/home/ray/scripts/test.py" in rayjob.entrypoint + assert f"{MOUNT_PATH}/test.py" in rayjob.entrypoint finally: os.chdir(original_cwd) @@ -1645,9 +1624,7 @@ def test_process_script_and_imports_io_error(mocker, tmp_path): mocker.patch("builtins.open", side_effect=IOError("Permission denied")) # Should handle the error gracefully and not crash - rayjob._process_script_and_imports( - "test.py", scripts, "/home/ray/scripts", processed_files - ) + rayjob._process_script_and_imports("test.py", scripts, MOUNT_PATH, processed_files) # Should add to processed_files but not to scripts (due to error) assert "test.py" in processed_files @@ -1671,7 +1648,7 @@ def test_process_script_and_imports_container_path_skip(mocker): # Test script path already in container rayjob._process_script_and_imports( - "/home/ray/scripts/test.py", scripts, "/home/ray/scripts", processed_files + f"{MOUNT_PATH}/test.py", scripts, MOUNT_PATH, processed_files ) # Should skip processing @@ -1695,9 +1672,7 @@ def test_process_script_and_imports_already_processed(mocker, tmp_path): processed_files = {"test.py"} # Already processed # Should return early without processing - rayjob._process_script_and_imports( - "test.py", scripts, "/home/ray/scripts", processed_files - ) + rayjob._process_script_and_imports("test.py", scripts, MOUNT_PATH, processed_files) # Should remain unchanged assert len(scripts) == 0