Skip to content

Commit

Permalink
[Datasets] Fix spread resource prefix tasks with no CPU requested. (#…
Browse files Browse the repository at this point in the history
…22017)

When applying the `_spread_resouce_prefix` hack, don't make the CPU resource a required resource when `num_cpus=0` is requested.
  • Loading branch information
clarkzinzow committed Feb 1, 2022
1 parent 00e1ac3 commit b3fd3c6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
2 changes: 1 addition & 1 deletion python/ray/data/impl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _get_resource_request_labels(ray_remote_args: Dict[str, Any]):
task resource request defaults.
"""
resource_request_labels = set(ray_remote_args.get("resources", {}).keys())
if DEFAULT_REMOTE_FUNCTION_CPUS > 0:
if ray_remote_args.get("num_cpus", DEFAULT_REMOTE_FUNCTION_CPUS) > 0:
resource_request_labels.add("CPU")
if "num_gpus" in ray_remote_args:
resource_request_labels.add("GPU")
Expand Down
41 changes: 41 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3982,6 +3982,47 @@ def get_node_id():
assert set(locations) == {node1_id, node2_id}


def test_parquet_read_spread_no_cpus(ray_start_cluster, tmp_path):
cluster = ray_start_cluster
cluster.add_node(
resources={"foo": 100}, _system_config={"max_direct_call_object_size": 0}
)
cluster.add_node(resources={"bar:1": 100})
cluster.add_node(resources={"bar:2": 100}, num_cpus=0)

ray.init(cluster.address)

@ray.remote(num_cpus=0)
def get_node_id():
return ray.get_runtime_context().node_id.hex()

node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote())
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())

data_path = str(tmp_path)
df1 = pd.DataFrame({"one": list(range(100)), "two": list(range(100, 200))})
path1 = os.path.join(data_path, "test1.parquet")
df1.to_parquet(path1)
df2 = pd.DataFrame({"one": list(range(300, 400)), "two": list(range(400, 500))})
path2 = os.path.join(data_path, "test2.parquet")
df2.to_parquet(path2)

ds = ray.data.read_parquet(
data_path, ray_remote_args={"num_cpus": 0}, _spread_resource_prefix="bar:"
)

# Force reads.
blocks = ds.get_internal_block_refs()
assert len(blocks) == 2

ray.wait(blocks, num_returns=len(blocks), fetch_local=False)
location_data = ray.experimental.get_object_locations(blocks)
locations = []
for block in blocks:
locations.extend(location_data[block]["node_ids"])
assert set(locations) == {node1_id, node2_id}


@pytest.mark.parametrize("num_items,parallelism", [(100, 1), (1000, 4)])
def test_sort_arrow(ray_start_regular, num_items, parallelism):
a = list(reversed(range(num_items)))
Expand Down

0 comments on commit b3fd3c6

Please sign in to comment.