Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[train] support memory per worker #42999

Merged
merged 10 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 26 additions & 37 deletions python/ray/train/_internal/backend_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,9 @@ class BackendExecutor:
backend_config: The configurations for this
specific backend.
num_workers: Number of workers to use for training.
num_cpus_per_worker: Number of CPUs to use per worker.
num_gpus_per_worker: Number of GPUs to use per worker.
additional_resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the extra resources that will be
requested for each worker in addition to ``num_cpus_per_worker``
and ``num_gpus_per_worker``.
resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the resources that will be
requested for each worker. Defaults to {"CPU": 1}.
max_retries: Number of retries when Ray actors fail.
Defaults to 3. Set to -1 for unlimited retries.
"""
Expand All @@ -89,17 +86,17 @@ def __init__(
# TODO(xwjiang): Legacy Ray Train trainer clean up!
trial_info: Optional[TrialInfo] = None,
num_workers: int = 1,
num_cpus_per_worker: float = 1,
num_gpus_per_worker: float = 0,
additional_resources_per_worker: Optional[Dict[str, float]] = None,
resources_per_worker: Optional[Dict[str, float]] = None,
max_retries: int = 3,
):
if resources_per_worker is None:
self._resources_per_worker = {"CPU": 1}
else:
self._resources_per_worker = resources_per_worker.copy()

self._backend_config = backend_config
self._backend = backend_config.backend_cls()
self._num_workers = num_workers
self._num_cpus_per_worker = num_cpus_per_worker
self._num_gpus_per_worker = num_gpus_per_worker
self._additional_resources_per_worker = additional_resources_per_worker
self._max_failures = max_retries
if self._max_failures < 0:
self._max_failures = float("inf")
Expand Down Expand Up @@ -133,9 +130,7 @@ def start(
placement_group = self._placement_group or "default"
self.worker_group = WorkerGroup(
num_workers=self._num_workers,
num_cpus_per_worker=self._num_cpus_per_worker,
num_gpus_per_worker=self._num_gpus_per_worker,
additional_resources_per_worker=self._additional_resources_per_worker,
resources_per_worker=self._resources_per_worker,
actor_cls=train_cls,
actor_cls_args=train_cls_args,
actor_cls_kwargs=train_cls_kwargs,
Expand Down Expand Up @@ -175,18 +170,20 @@ def _set_driver_dataset_context(ctx: DataContext):
)
)

if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled:
if (
self._resources_per_worker.get("GPU", 0) > 0
and share_cuda_visible_devices_enabled
):
self._share_cuda_visible_devices()
elif self._additional_resources_per_worker:
for resource_config in self._resource_configs:
if self._is_share_resources_enabled(
for resource_config in self._resource_configs:
if self._is_share_resources_enabled(
resource_config.resource_name,
resource_config.resource_enable_sharing_env_var,
):
self._share_resource_ids(
resource_config.resource_name,
resource_config.resource_enable_sharing_env_var,
):
self._share_resource_ids(
resource_config.resource_name,
resource_config.share_resource_ids_env_var,
)
resource_config.share_resource_ids_env_var,
)
self._backend.on_start(self.worker_group, self._backend_config)
except RayActorError as exc:
logger.exception(str(exc))
Expand Down Expand Up @@ -221,15 +218,9 @@ def _create_placement_group(self):
)

if should_create_placement_group:
additional_resources_per_worker = (
self._additional_resources_per_worker or {}
)
bundle = {
"CPU": self._num_cpus_per_worker,
"GPU": self._num_gpus_per_worker,
**additional_resources_per_worker,
}
bundles = [bundle.copy() for _ in range(self._num_workers)]
bundles = [
self._resources_per_worker.copy() for _ in range(self._num_workers)
]

use_spread = bool(env_integer(TRAIN_ENABLE_WORKER_SPREAD_ENV, 0))
strategy = "SPREAD" if use_spread else "PACK"
Expand Down Expand Up @@ -348,9 +339,7 @@ def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: st
enable_sharing_env: The name of the environment variable
to check.
"""
has_resource_requested = (
self._additional_resources_per_worker.get(resource_name, 0) > 0
)
has_resource_requested = self._resources_per_worker.get(resource_name, 0) > 0
return has_resource_requested and ray_constants.env_bool(
enable_sharing_env, True
)
Expand Down
38 changes: 18 additions & 20 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,9 @@ class WorkerGroup:
Args:
num_workers: The number of workers (Ray actors) to launch.
Defaults to 1.
num_cpus_per_worker: The number of CPUs to reserve for each
worker. Fractional values are allowed. Defaults to 1.
num_gpus_per_worker: The number of GPUs to reserve for each
worker. Fractional values are allowed. Defaults to 0.
additional_resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the extra resources that will be
requested for each worker in addition to ``num_cpus_per_worker``
and ``num_gpus_per_worker``.
resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the resources that will be
requested for each worker. Defaults to {"CPU": 1}.
actor_cls (Optional[Type]): If specified use this class as the
remote actors.
remote_cls_args, remote_cls_kwargs: If ``remote_cls`` is provided,
Expand All @@ -142,26 +137,28 @@ class WorkerGroup:
def __init__(
self,
num_workers: int = 1,
num_cpus_per_worker: float = 1,
num_gpus_per_worker: float = 0,
additional_resources_per_worker: Optional[Dict[str, float]] = None,
resources_per_worker: Optional[Dict[str, float]] = None,
actor_cls: Type = None,
actor_cls_args: Optional[Tuple] = None,
actor_cls_kwargs: Optional[Dict] = None,
placement_group: Union[PlacementGroup, str] = "default",
):
if resources_per_worker is None:
resources_per_worker = {"CPU": 1}
else:
resources_per_worker = resources_per_worker.copy()

if num_workers <= 0:
raise ValueError(
"The provided `num_workers` must be greater "
f"than 0. Received num_workers={num_workers} "
f"instead."
)
if num_cpus_per_worker < 0 or num_gpus_per_worker < 0:

if any(v < 0 for v in resources_per_worker.values()):
raise ValueError(
"The number of CPUs and GPUs per worker must "
"not be negative. Received "
f"num_cpus_per_worker={num_cpus_per_worker} and "
f"num_gpus_per_worker={num_gpus_per_worker}."
"The number of resources per worker must not be negative. "
f"Received resources_per_worker={resources_per_worker}."
)

if (actor_cls_args or actor_cls_kwargs) and not actor_cls:
Expand All @@ -171,9 +168,9 @@ def __init__(
)

self.num_workers = num_workers
self.num_cpus_per_worker = num_cpus_per_worker
self.num_gpus_per_worker = num_gpus_per_worker
self.additional_resources_per_worker = additional_resources_per_worker
self.num_cpus_per_worker = resources_per_worker.pop("CPU", 0)
self.num_gpus_per_worker = resources_per_worker.pop("GPU", 0)
self.memory_per_worker = resources_per_worker.pop("memory", 0)
self.workers = []
self._base_cls = create_executable_class(actor_cls)
assert issubclass(self._base_cls, RayTrainWorker)
Expand All @@ -188,7 +185,8 @@ def __init__(
self._remote_cls = ray.remote(
num_cpus=self.num_cpus_per_worker,
num_gpus=self.num_gpus_per_worker,
resources=self.additional_resources_per_worker,
memory=self.memory_per_worker,
resources=resources_per_worker,
)(self._base_cls)
self.start()

Expand Down
6 changes: 1 addition & 5 deletions python/ray/train/data_parallel_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,6 @@ def training_loop(self) -> None:
discard_returns=True,
)

additional_resources_per_worker = scaling_config.additional_resources_per_worker

trial_info = TrialInfo(
name=session.get_trial_name(),
id=session.get_trial_id(),
Expand All @@ -454,9 +452,7 @@ def training_loop(self) -> None:
backend_config=self._backend_config,
trial_info=trial_info,
num_workers=scaling_config.num_workers,
num_cpus_per_worker=scaling_config.num_cpus_per_worker,
num_gpus_per_worker=scaling_config.num_gpus_per_worker,
additional_resources_per_worker=additional_resources_per_worker,
resources_per_worker=scaling_config._resources_per_worker_not_none,
max_retries=0,
)

Expand Down
12 changes: 5 additions & 7 deletions python/ray/train/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def get_resources():

os.environ[ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV] = "1"
e = BackendExecutor(
config, num_workers=num_workers, num_cpus_per_worker=0, num_gpus_per_worker=1
config, num_workers=num_workers, resources_per_worker={"GPU": 1}
)
e.start()
_start_training(e, get_resources)
Expand Down Expand Up @@ -369,7 +369,7 @@ def get_resources():

os.environ[ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV] = "1"
e = BackendExecutor(
config, num_workers=num_workers, num_cpus_per_worker=0, num_gpus_per_worker=0.5
config, num_workers=num_workers, resources_per_worker={"GPU": 0.5}
)
e.start()
_start_training(e, get_resources)
Expand Down Expand Up @@ -408,7 +408,7 @@ def get_resources():

os.environ[ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV] = "1"
e = BackendExecutor(
config, num_workers=num_workers, num_cpus_per_worker=0, num_gpus_per_worker=2
config, num_workers=num_workers, resources_per_worker={"GPU": 2}
)
e.start()
_start_training(e, get_resources)
Expand Down Expand Up @@ -443,8 +443,7 @@ def get_resources():
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
Expand Down Expand Up @@ -481,8 +480,7 @@ def get_resources():
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
Expand Down
58 changes: 53 additions & 5 deletions python/ray/train/tests/test_worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import ray
import ray._private.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.train._internal.worker_group import Worker, WorkerGroup, WorkerMetadata


Expand Down Expand Up @@ -32,6 +33,29 @@ def ray_start_2_cpus_and_neuron_core_accelerator():
ray.shutdown()


@pytest.fixture
def ray_start_2_cpus_and_10kb_memory():
address_info = ray.init(num_cpus=2, _memory=10_000)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()


@pytest.fixture
def ray_start_5_nodes_with_memory():
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=4, memory=500)
cluster.add_node(num_cpus=4, memory=2_000)

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


def test_worker_creation(ray_start_2_cpus):
assert ray.available_resources()["CPU"] == 2
wg = WorkerGroup(num_workers=2)
Expand All @@ -44,14 +68,35 @@ def test_worker_creation(ray_start_2_cpus):

def test_worker_creation_num_cpus(ray_start_2_cpus):
assert ray.available_resources()["CPU"] == 2
wg = WorkerGroup(num_cpus_per_worker=2)
wg = WorkerGroup(resources_per_worker={"CPU": 2})
time.sleep(1)
assert len(wg.workers) == 1
# Make sure both CPUs are being used by the actor.
assert "CPU" not in ray.available_resources()
wg.shutdown()


def test_worker_creation_with_memory(ray_start_5_nodes_with_memory):
resources_per_worker = {"memory": 1_000}
wg = WorkerGroup(num_workers=2, resources_per_worker=resources_per_worker)
assert len(wg.workers) == 2

nodes = ray.nodes()
large_node = [node for node in nodes if node["Resources"]["memory"] == 2_000][0]
large_node_id = large_node["NodeID"]

def validate_scheduling():
resources = ray.get_runtime_context().get_assigned_resources()
assert resources == resources_per_worker, "Resources should include memory."

node_id = ray.get_runtime_context().get_node_id()
assert (
node_id == large_node_id
), "Workers should be scheduled on the large node."

wg.execute(validate_scheduling)


def test_worker_shutdown(ray_start_2_cpus):
assert ray.available_resources()["CPU"] == 2
wg = WorkerGroup(num_workers=2)
Expand Down Expand Up @@ -79,7 +124,7 @@ def test_worker_restart(ray_start_2_cpus):

def test_worker_with_gpu_ids(ray_start_2_cpus_and_gpus):
num_gpus = 2
wg = WorkerGroup(num_workers=2, num_gpus_per_worker=1)
wg = WorkerGroup(num_workers=2, resources_per_worker={"GPU": 1})
assert len(wg.workers) == 2
time.sleep(1)
assert ray_constants.GPU not in ray.available_resources()
Expand All @@ -98,7 +143,7 @@ def test_worker_with_neuron_core_accelerator_ids(
):
num_nc = 2
wg = WorkerGroup(
num_workers=2, additional_resources_per_worker={ray_constants.NEURON_CORES: 1}
num_workers=2, resources_per_worker={ray_constants.NEURON_CORES: 1}
)
assert len(wg.workers) == 2
time.sleep(1)
Expand Down Expand Up @@ -271,10 +316,13 @@ def test_bad_resources(ray_start_2_cpus):
WorkerGroup(num_workers=-1)

with pytest.raises(ValueError):
WorkerGroup(num_cpus_per_worker=-1)
WorkerGroup(resources_per_worker={"CPU": -1})

with pytest.raises(ValueError):
WorkerGroup(resources_per_worker={"GPU": -1})

with pytest.raises(ValueError):
WorkerGroup(num_gpus_per_worker=-1)
WorkerGroup(resources_per_worker={"memory": -1})


def test_placement_group(ray_start_2_cpus):
Expand Down