Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] Support Accelerator Type in ScalingConfig #43090

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 34 additions & 10 deletions python/ray/air/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ray.util.annotations import PublicAPI, Deprecated
from ray.widgets import Template, make_table_html_repr
from ray.data.preprocessor import Preprocessor
from ray._private.ray_constants import RESOURCE_CONSTRAINT_PREFIX

if TYPE_CHECKING:
from ray.tune.callback import Callback
Expand Down Expand Up @@ -125,6 +126,12 @@ class ScalingConfig:
placement_strategy: The placement strategy to use for the
placement group of the Ray actors. See :ref:`Placement Group
Strategies <pgroup-strategy>` for the possible options.
woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
accelerator_type: [Experimental] If specified, Ray Train will launch the
training coordinator and workers on the nodes with the specified type
of accelerators.
See :ref:`the available accelerator types <accelerator_types>`.
Ensure that your cluster has instances with the specified accelerator type
or is able to autoscale to fulfill the request.

Example:

Expand All @@ -149,6 +156,7 @@ class ScalingConfig:
use_gpu: Union[bool, SampleRange] = False
resources_per_worker: Optional[Union[Dict, SampleRange]] = None
placement_strategy: Union[str, SampleRange] = "PACK"
accelerator_type: Optional[str] = None

def __post_init__(self):
if self.resources_per_worker:
Expand Down Expand Up @@ -185,14 +193,21 @@ def _resources_per_worker_not_none(self):
# Note that we don't request any CPUs, which avoids possible
# scheduling contention. Generally nodes have many more CPUs than
# GPUs, so not requesting a CPU does not lead to oversubscription.
return {"GPU": 1}
resources_per_worker = {"GPU": 1}
else:
return {"CPU": 1}
resources_per_worker = {
k: v for k, v in self.resources_per_worker.items() if v != 0
}
resources_per_worker = {"CPU": 1}
else:
resources_per_worker = {
Comment on lines +196 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can clean up this branching logic a bit now since we don't return early anymore, but we can do it in a separate PR...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's merge it first. I'll post a followup PR to remove the branchings like colab & num_workers=None, etc.

k: v for k, v in self.resources_per_worker.items() if v != 0
}

if self.use_gpu:
resources_per_worker.setdefault("GPU", 1)

if self.accelerator_type:
resources_per_worker[
f"{RESOURCE_CONSTRAINT_PREFIX}{self.accelerator_type}"
] = 0.001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use setdefault similar to GPU in the line above? In case the user manually sets resources_per_worker. Or validate that it's not set in the resources dict if we want this to be the only interface where the user can specify.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "accelerator_type:{type}" key should only be used as internal api, users should not specify it in resource_per_worker, as we provided the ScalingConfig(accelerator_type=) api as the only entry.

For example, if a user needs two A100 GPUs, they can do:

ScalingConfig(
    num_workers=...,
    resources_per_worker={"GPU": 2},
    accelerator_type="A100",
)

But I agree to change it to setdefault to provide more flexibility for advanced use cases.

return resources_per_worker

@property
Expand All @@ -208,16 +223,25 @@ def _trainer_resources_not_none(self):
try:
import google.colab # noqa: F401

trainer_resources = 0
trainer_num_cpus = 0
except ImportError:
trainer_resources = 1
trainer_num_cpus = 1
else:
# If there are no additional workers, then always reserve 1 CPU for
# the Trainer.
trainer_resources = 1
trainer_num_cpus = 1

return {"CPU": trainer_resources}
return {k: v for k, v in self.trainer_resources.items() if v != 0}
trainer_resources = {"CPU": trainer_num_cpus}
else:
trainer_resources = {
k: v for k, v in self.trainer_resources.items() if v != 0
}

if self.accelerator_type:
trainer_resources[
f"{RESOURCE_CONSTRAINT_PREFIX}{self.accelerator_type}"
] = 0.001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to explicitly set it here too? Won't it get merged with rank 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch. This shouldn't be here after we merged the previous colocate pr.

return trainer_resources

@property
def total_resources(self):
Expand Down
69 changes: 69 additions & 0 deletions python/ray/air/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,75 @@ def test_scaling_config_validate_config_bad_allowed_keys():
assert "are not present in" in str(exc_info.value)


def test_scaling_config_accelerator_type():
# Basic
scaling_config = ScalingConfig(num_workers=2, use_gpu=True, accelerator_type="A100")
assert scaling_config.accelerator_type == "A100"
assert scaling_config._trainer_resources_not_none == {
"CPU": 1,
"accelerator_type:A100": 0.001,
}
assert scaling_config._resources_per_worker_not_none == {
"GPU": 1,
"accelerator_type:A100": 0.001,
}
woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
assert scaling_config.additional_resources_per_worker == {
"accelerator_type:A100": 0.001
}
assert scaling_config.as_placement_group_factory().bundles == [
{"GPU": 1, "accelerator_type:A100": 0.002, "CPU": 1},
{"GPU": 1, "accelerator_type:A100": 0.001},
]

# With resources_per_worker
scaling_config = ScalingConfig(
num_workers=2,
use_gpu=True,
accelerator_type="A100",
resources_per_worker={"custom_resource": 1},
)
assert scaling_config._trainer_resources_not_none == {
"CPU": 1,
"accelerator_type:A100": 0.001,
}
assert scaling_config._resources_per_worker_not_none == {
"GPU": 1,
"custom_resource": 1,
"accelerator_type:A100": 0.001,
}
assert scaling_config.additional_resources_per_worker == {
"custom_resource": 1,
"accelerator_type:A100": 0.001,
}
assert scaling_config.as_placement_group_factory().bundles == [
{"GPU": 1, "custom_resource": 1, "accelerator_type:A100": 0.002, "CPU": 1},
{"GPU": 1, "custom_resource": 1, "accelerator_type:A100": 0.001},
]

# With trainer_resources
scaling_config = ScalingConfig(
num_workers=2,
use_gpu=True,
accelerator_type="A100",
trainer_resources={"memory": 10 * 1024**3},
)
assert scaling_config._trainer_resources_not_none == {
"memory": 10 * 1024**3,
"accelerator_type:A100": 0.001,
}
assert scaling_config._resources_per_worker_not_none == {
"GPU": 1,
"accelerator_type:A100": 0.001,
}
assert scaling_config.additional_resources_per_worker == {
"accelerator_type:A100": 0.001
}
assert scaling_config.as_placement_group_factory().bundles == [
{"GPU": 1, "accelerator_type:A100": 0.002, "memory": 10 * 1024**3},
{"GPU": 1, "accelerator_type:A100": 0.001},
]


@pytest.mark.parametrize(
"trainer_resources", [None, {}, {"CPU": 1}, {"CPU": 2, "GPU": 1}, {"CPU": 0}]
)
Expand Down
11 changes: 4 additions & 7 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,10 @@ def sort_workers_by_ip_and_gpu_id(self, _first_ip: Optional[str] = None):

Args:
_first_ip: The first IP to group by.
Hack to avoid OOMs.
This is just a temporary solution for Train loading entire checkpoints
into memory by ensuring that the rank 0 worker is on the same node as
trainable, thus allowing for lazy checkpoint transfer to be used.
See https://github.com/ray-project/ray/issues/33073
for more context.
TODO remove this argument.
Set this to the node IP of the trainer coordinator to ensure that the
rank 0 worker is on the same node, allowing additional resources to
be specified for rank 0 workers via
`ScalingConfig(trainer_resources=)`.
"""
ip_to_workers = defaultdict(list)

Expand Down
1 change: 1 addition & 0 deletions python/ray/train/data_parallel_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def __init__(self, train_loop_per_worker, my_backend_config:
"resources_per_worker",
"use_gpu",
"placement_strategy",
"accelerator_type",
]

# For backwards compatibility with the legacy dataset config API.
Expand Down
56 changes: 56 additions & 0 deletions python/ray/train/tests/test_data_parallel_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import ray
from ray import train, tune
from ray._private.ray_constants import RESOURCE_CONSTRAINT_PREFIX
from ray.cluster_utils import Cluster
from ray.train import RunConfig, ScalingConfig
from ray.train._internal.backend_executor import BackendExecutor
from ray.train._internal.worker_group import WorkerGroup
Expand All @@ -15,6 +17,7 @@
from ray.tune.callback import Callback
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner
from ray.util.accelerators import NVIDIA_A100, NVIDIA_TESLA_A10G


@pytest.fixture
Expand All @@ -33,6 +36,34 @@ def ray_start_4_cpus_4_gpus_4_extra():
ray.shutdown()


@pytest.fixture
def ray_start_heterogenous_cluster():
"""
Start a heterogenous cluster with 6 nodes:
- 2 node with 4 x A100
- 2 node with 4 x A10G
- 2 node with 4 x GPU without accelerator_type
"""
cluster = Cluster()

for accelerator_type in [NVIDIA_A100, NVIDIA_TESLA_A10G, None]:
for _ in range(2):
cluster.add_node(
num_cpus=4,
num_gpus=4,
resources={f"{RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}": 4}
if accelerator_type
else {},
)

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


def gen_execute_single_async_special(special_f):
def execute_single_async_special(self, i, f, *args, **kwargs):
assert len(self.workers) == 2
Expand Down Expand Up @@ -320,6 +351,31 @@ def get_resources():
assert visible_devices == ["0,1,2,3", "0,1,2,3"]


@pytest.mark.parametrize("num_gpus", [1, 2])
@pytest.mark.parametrize("accelerator_type", [NVIDIA_A100, NVIDIA_TESLA_A10G, None])
def test_config_accelerator_type(
ray_start_heterogenous_cluster, num_gpus, accelerator_type
):
def train_func():
# Ensure all workers are scheduled on nodes with specified accelerators
assigned_resources = ray.get_runtime_context().get_assigned_resources()
assert assigned_resources["GPU"] == num_gpus
if accelerator_type:
accelerator_key = f"{RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"
assert accelerator_key in assigned_resources

trainer = DataParallelTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4,
use_gpu=True,
accelerator_type=accelerator_type,
resources_per_worker={"GPU": num_gpus},
),
)
trainer.fit()


if __name__ == "__main__":
import sys

Expand Down