Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Add heap_memory param for setup_ray_cluster API, and change default value of per ray worker node config, and change default value of ray head node config for global Ray cluster #42604

Merged
merged 24 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/ray/tests/spark/test_GPU.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def test_gpu_allocation(self):
num_task_slots=num_ray_task_slots,
physical_mem_bytes=_RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES,
shared_mem_bytes=_RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES,
configured_heap_memory_bytes=None,
configured_object_store_bytes=None,
)

Expand Down Expand Up @@ -135,6 +136,7 @@ def test_gpu_autoscaling(self):
num_task_slots=num_ray_task_slots,
physical_mem_bytes=_RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES,
shared_mem_bytes=_RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES,
configured_heap_memory_bytes=None,
configured_object_store_bytes=None,
)

Expand Down Expand Up @@ -181,6 +183,16 @@ def f(x):
retry_interval_ms=1000,
)

def test_default_resource_allocation(self):
with _setup_ray_cluster(
max_worker_nodes=1,
head_node_options={"include_dashboard": False},
):
ray.init()
worker_res_list = self.get_ray_worker_resources_list()
assert worker_res_list[0]["CPU"] == self.num_total_gpus
assert worker_res_list[0]["GPU"] == self.num_total_cpus


class TestBasicSparkGPUCluster(RayOnSparkGPUClusterTestBase):
@classmethod
Expand Down
20 changes: 18 additions & 2 deletions python/ray/tests/spark/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def _setup_ray_cluster(*args, **kwds):
class RayOnSparkCPUClusterTestBase(ABC):
spark = None
num_total_cpus = None
num_total_gpus = None
num_cpus_per_spark_task = None
num_gpus_per_spark_task = None
max_spark_tasks = None

@classmethod
Expand Down Expand Up @@ -118,11 +120,13 @@ def test_cpu_allocation(self):
num_task_slots=num_ray_task_slots,
physical_mem_bytes=_RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES,
shared_mem_bytes=_RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES,
configured_heap_memory_bytes=None,
configured_object_store_bytes=None,
)
with _setup_ray_cluster(
max_worker_nodes=max_worker_nodes_arg,
num_cpus_worker_node=num_cpus_worker_node,
num_gpus_worker_node=0,
head_node_options={"include_dashboard": False},
):
ray.init()
Expand All @@ -144,6 +148,8 @@ def test_public_api(self):
shutil.rmtree(collect_log_to_path, ignore_errors=True)
setup_ray_cluster(
max_worker_nodes=MAX_NUM_WORKER_NODES,
num_cpus_worker_node=1,
num_gpus_worker_node=0,
collect_log_to_path=collect_log_to_path,
ray_temp_root_dir=ray_temp_root_dir,
head_node_options={"include_dashboard": True},
Expand Down Expand Up @@ -191,7 +197,11 @@ def f(x):
shutil.rmtree(collect_log_to_path, ignore_errors=True)

def test_ray_cluster_shutdown(self):
with _setup_ray_cluster(max_worker_nodes=self.max_spark_tasks) as cluster:
with _setup_ray_cluster(
max_worker_nodes=self.max_spark_tasks,
num_cpus_worker_node=1,
num_gpus_worker_node=0,
) as cluster:
ray.init()
assert len(self.get_ray_worker_resources_list()) == self.max_spark_tasks

Expand All @@ -207,7 +217,11 @@ def test_ray_cluster_shutdown(self):
assert not is_port_in_use(hostname, int(port))

def test_background_spark_job_exit_trigger_ray_head_exit(self):
with _setup_ray_cluster(max_worker_nodes=self.max_spark_tasks) as cluster:
with _setup_ray_cluster(
max_worker_nodes=self.max_spark_tasks,
num_cpus_worker_node=1,
num_gpus_worker_node=0,
) as cluster:
ray.init()
# Mimic the case the job failed unexpectedly.
cluster._cancel_background_spark_job()
Expand Down Expand Up @@ -235,13 +249,15 @@ def test_autoscaling(self):
num_task_slots=num_ray_task_slots,
physical_mem_bytes=_RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES,
shared_mem_bytes=_RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES,
configured_heap_memory_bytes=None,
configured_object_store_bytes=None,
)

with _setup_ray_cluster(
max_worker_nodes=max_worker_nodes,
min_worker_nodes=min_worker_nodes,
num_cpus_worker_node=num_cpus_worker_node,
num_gpus_worker_node=0,
head_node_options={"include_dashboard": False},
autoscale_idle_timeout_minutes=0.1,
):
Expand Down
6 changes: 4 additions & 2 deletions python/ray/tests/spark/test_databricks_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ def test_hook(self, monkeypatch):
"ray.util.spark.databricks_hook.get_db_entry_point", lambda: db_api_entry
)
monkeypatch.setattr(
"ray.util.spark.databricks_hook.get_databricks_function",
lambda *args, **kwargs: None,
"ray.util.spark.databricks_hook.get_databricks_display_html_function",
lambda: lambda x: print(x),
)
try:
setup_ray_cluster(
max_worker_nodes=2,
num_cpus_worker_node=1,
num_gpus_worker_node=0,
head_node_options={"include_dashboard": False},
)
cluster = ray.util.spark.cluster_init._active_ray_cluster
Expand Down
25 changes: 21 additions & 4 deletions python/ray/tests/spark/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,31 @@ def test_get_spark_task_assigned_physical_gpus():

@patch("ray._private.ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES", 1)
def test_calc_mem_per_ray_worker_node():
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, 100000) == (
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, None, 100000) == (
120000,
80000,
None,
)
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, 70000) == (
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, None, 70000) == (
130000,
70000,
None,
)
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, None) == (
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, None, None) == (
140000,
60000,
None,
)
assert _calc_mem_per_ray_worker_node(4, 1000000, 200000, None) == (
assert _calc_mem_per_ray_worker_node(4, 1000000, 200000, None, None) == (
160000,
40000,
None,
)
assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, 150000, 70000) == (
150000,
70000,
None,
)


@patch("ray._private.ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES", 1)
Expand All @@ -64,33 +69,45 @@ def test_get_avail_mem_per_ray_worker_node(monkeypatch):
assert _get_avail_mem_per_ray_worker_node(
num_cpus_per_node=1,
num_gpus_per_node=2,
heap_memory_per_node=None,
object_store_memory_per_node=None,
) == (140000, 60000, None, None)

assert _get_avail_mem_per_ray_worker_node(
num_cpus_per_node=1,
num_gpus_per_node=2,
heap_memory_per_node=None,
object_store_memory_per_node=80000,
) == (120000, 80000, None, None)

assert _get_avail_mem_per_ray_worker_node(
num_cpus_per_node=1,
num_gpus_per_node=2,
heap_memory_per_node=None,
object_store_memory_per_node=120000,
) == (100000, 100000, None, None)

assert _get_avail_mem_per_ray_worker_node(
num_cpus_per_node=2,
num_gpus_per_node=2,
heap_memory_per_node=None,
object_store_memory_per_node=None,
) == (280000, 120000, None, None)

assert _get_avail_mem_per_ray_worker_node(
num_cpus_per_node=1,
num_gpus_per_node=4,
heap_memory_per_node=None,
object_store_memory_per_node=None,
) == (280000, 120000, None, None)

assert _get_avail_mem_per_ray_worker_node(
num_cpus_per_node=1,
num_gpus_per_node=2,
heap_memory_per_node=150000,
object_store_memory_per_node=70000,
) == (150000, 70000, None, None)


def test_convert_ray_node_options():
assert _convert_ray_node_options(
Expand Down