Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] Add accelerator ids to workers and share neuron_cores by default #39091

Merged
merged 18 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 80 additions & 11 deletions python/ray/train/_internal/backend_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar, Any

import ray
import ray._private.ray_constants as ray_constants
from ray.data import Dataset
from ray._private.ray_constants import env_integer
from ray.air.config import CheckpointConfig
Expand All @@ -27,6 +28,7 @@
TRAIN_ENABLE_WORKER_SPREAD_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
DISABLE_LAZY_CHECKPOINTING_ENV,
ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV,
)
from ray.util.placement_group import get_current_placement_group, remove_placement_group

Expand Down Expand Up @@ -153,6 +155,9 @@ def start(

if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled:
self._share_cuda_visible_devices()
elif self._additional_resources_per_worker:
if self._share_neuron_core_ids_enabled():
self._share_neuron_core_ids()
self._backend.on_start(self.worker_group, self._backend_config)
except RayActorError as exc:
logger.exception(str(exc))
Expand Down Expand Up @@ -245,32 +250,96 @@ def _share_cuda_visible_devices(self):
- Worker2: "0,1"

"""

node_ids_and_gpu_ids = [
(w.metadata.node_id, w.metadata.gpu_ids) for w in self.worker_group.workers
(w.metadata.node_id, w.metadata.gpu_and_accelerator_ids[ray_constants.GPU])
for w in self.worker_group.workers
]
self._share_runtime_ids(
node_ids_and_runtime_ids=node_ids_and_gpu_ids,
env_var=ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR,
)

def _share_neuron_core_ids(self):
"""Sets NEURON_RT_VISIBLE_CORES on all workers.

For each worker, NEURON_RT_VISIBLE_CORES will be set to the
NEURON_CORE IDs visible to all workers on that worker's node.

This allows the workers on the same node to communicate
with one another.

Example:

Setup:
- Node1:
- Worker1: {0, 1}
- Worker2: {2, 3}
- Node2:
- Worker3: {0, 1}

NEURON_RT_VISIBLE_CORES:
- Worker1: "0,1,2,3"
- Worker2: "0,1,2,3"
- Worker2: "0,1"
"""
node_ids_and_neuron_core_ids = [
(
w.metadata.node_id,
w.metadata.gpu_and_accelerator_ids[ray_constants.NEURON_CORES],
)
for w in self.worker_group.workers
]
self._share_runtime_ids(
node_ids_and_runtime_ids=node_ids_and_neuron_core_ids,
env_var=ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR,
)

def _share_runtime_ids(
self, node_ids_and_runtime_ids: List[Tuple[str, List[str]]], env_var: str
):
chappidim marked this conversation as resolved.
Show resolved Hide resolved
"""Sets the given env_var on all workers.
Args:
node_ids_and_runtime_ids: A list of tuples of node_id and
list of runtime_ids.
env_var: The name of the environment variable to set.
"""
node_id_to_worker_id = defaultdict(set)
node_id_to_gpu_ids = defaultdict(set)
node_id_to_runtime_ids = defaultdict(set)

for worker_id, (node_id, gpu_ids) in enumerate(node_ids_and_gpu_ids):
for worker_id, (node_id, runtime_id) in enumerate(node_ids_and_runtime_ids):
node_id_to_worker_id[node_id].add(worker_id)
node_id_to_gpu_ids[node_id].update(gpu_ids)
node_id_to_runtime_ids[node_id].update(runtime_id)

futures = []
for node_id, gpu_ids in node_id_to_gpu_ids.items():
gpu_ids = sorted(gpu_ids)
all_gpu_ids = ",".join(gpu_ids)
for node_id, runtime_ids in node_id_to_runtime_ids.items():
runtime_ids = sorted(runtime_ids)
all_runtime_ids = ",".join(runtime_ids)

def set_gpu_ids():
os.environ["CUDA_VISIBLE_DEVICES"] = all_gpu_ids
def set_runtime_ids():
os.environ[env_var] = all_runtime_ids

for worker_id in node_id_to_worker_id[node_id]:
futures.append(
self.worker_group.execute_single_async(worker_id, set_gpu_ids)
self.worker_group.execute_single_async(worker_id, set_runtime_ids)
)
ray.get(futures)

def _share_neuron_core_ids_enabled(self):
"""Whether to share NEURON_RT_VISIBLE_CORES on all workers.
This is enabled by default if neuron_cores are requested for
workers. User can disable it by configuring the
TRAIN_ENABLE_SHARE_NEURON_RT_VISIBLE_CORES to "0"
"""
return bool(
env_integer(
ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV,
self._additional_resources_per_worker.get(
ray_constants.NEURON_CORES, None
)
is not None,
)
)

def _create_rank_world_size_mappings(self) -> List[Dict]:
"""Create rank and world size mappings for workers.
There are three maps returned:
Expand Down
33 changes: 29 additions & 4 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, List, TypeVar, Optional, Dict, Type, Tuple, Union

import ray
import ray._private.ray_constants as ray_constants
from ray.actor import ActorHandle
from ray.air._internal.util import skip_exceptions, exception_cause
from ray.types import ObjectRef
Expand Down Expand Up @@ -44,14 +45,14 @@ class WorkerMetadata:
node_id: ID of the node this worker is on.
node_ip: IP address of the node this worker is on.
hostname: Hostname that this worker is on.
gpu_ids: List of CUDA IDs available to this worker.
gpu_and_accelerator_ids: Map of GPU IDs, accelerator IDs (AWS NeuronCore, ..).
pid: Process ID of this worker.
"""

node_id: str
node_ip: str
hostname: str
gpu_ids: Optional[List[str]]
gpu_and_accelerator_ids: Dict[str, List[str]]
chappidim marked this conversation as resolved.
Show resolved Hide resolved
pid: int


Expand Down Expand Up @@ -86,18 +87,42 @@ def construct_metadata() -> WorkerMetadata:
node_id = ray.get_runtime_context().get_node_id()
node_ip = ray.util.get_node_ip_address()
hostname = socket.gethostname()
gpu_ids = [str(gpu_id) for gpu_id in ray.get_gpu_ids()]
pid = os.getpid()

return WorkerMetadata(
node_id=node_id,
node_ip=node_ip,
hostname=hostname,
gpu_ids=gpu_ids,
gpu_and_accelerator_ids=_get_gpu_and_accelerator_ids(),
pid=pid,
)


def _get_gpu_and_accelerator_ids() -> Dict[str, List[str]]:
"""Get GPU and accelerator IDs from runtime context for given actor/worker.

Returns:
A dictionary mapping resource IDs to a list of resource IDs.
For example,
{
"GPU": ["0", "1"],
"neuron_cores": ["0", "1"]
}
"""
gpu_and_accelerator_ids = defaultdict(list)

resource_ids = ray.get_runtime_context().get_resource_ids()
gpu_ids = resource_ids[ray_constants.GPU]
neuron_core_ids = resource_ids[ray_constants.NEURON_CORES]

gpu_and_accelerator_ids[ray_constants.GPU].extend(gpu_ids)
gpu_and_accelerator_ids[ray_constants.NEURON_CORES].extend(neuron_core_ids)

if len(gpu_ids) > 0 and len(neuron_core_ids) > 0:
raise RuntimeError("Cannot support GPU and Neuron Core IDs on same Worker.")
chappidim marked this conversation as resolved.
Show resolved Hide resolved
return gpu_and_accelerator_ids


class WorkerGroup:
"""Group of Ray Actors that can execute arbitrary functions.

Expand Down
5 changes: 5 additions & 0 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def _get_defaults_results_dir() -> str:
# Backend.share_cuda_visible_devices. 1 for True, 0 for False.
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES"

# Integer value which if set will not share the RT visible cores across workers.
# 1 for True (default), 0 for False.
ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV = "TRAIN_ENABLE_SHARE_NEURON_RT_VISIBLE_CORES"

# Integer value which indicates the number of seconds to wait when creating
# the worker placement group before timing out.
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV = "TRAIN_PLACEMENT_GROUP_TIMEOUT_S"
Expand All @@ -79,6 +83,7 @@ def _get_defaults_results_dir() -> str:
TRAIN_ENV_VARS = {
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV,
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
RAY_AIR_NEW_PERSISTENCE_MODE,
Expand Down
14 changes: 14 additions & 0 deletions python/ray/train/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def ray_2_node_2_gpu():
cluster.shutdown()


@pytest.fixture
def ray_2_node_2_neuron_cores():
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=4, resources={"neuron_cores": 2})

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


@pytest.fixture
def ray_start_2_cpus():
address_info = ray.init(num_cpus=2)
Expand Down
84 changes: 77 additions & 7 deletions python/ray/train/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time

import ray
import ray._private.ray_constants as ray_constants
from ray import train
from ray.air._internal.util import StartTraceback

Expand All @@ -25,6 +26,7 @@
from ray.train.constants import (
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV,
)
from ray.train.tensorflow import TensorflowConfig
from ray.train.torch import TorchConfig
Expand Down Expand Up @@ -97,7 +99,7 @@ def mock_add_workers(self, num_workers):
node_id=0,
node_ip=str(i % 2),
hostname=0,
gpu_ids=[0],
gpu_and_accelerator_ids={"GPU": ["0"]},
pid=0,
)
worker.metadata = metadata
Expand Down Expand Up @@ -307,12 +309,6 @@ def check_process_group():
def test_cuda_visible_devices(ray_2_node_2_gpu, worker_results):
config = TestConfig()

if worker_results[0] != len(worker_results[1]):
raise ValueError(
"Invalid test parameter. Length of expected result should "
"match number of workers."
)

def get_resources():
cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]
# Sort the cuda visible devices to have exact match with expected result.
Expand Down Expand Up @@ -419,6 +415,80 @@ def get_resources():
assert results == expected_results


@pytest.mark.parametrize(
"worker_results",
[
(1, [[0]]),
(2, [[0, 1]] * 2),
(3, [[0]] + [[0, 1]] * 2),
(4, [[0, 1]] * 4),
],
)
def test_neuron_core_accelerator_ids(ray_2_node_2_neuron_cores, worker_results):
config = TestConfig()

def get_resources():
neuron_runtime_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR]
# Sort the runtime ids to have exact match with expected result.
sorted_devices = [
int(device) for device in sorted(neuron_runtime_ids.split(","))
]
return sorted_devices

num_workers, expected_results = worker_results
# sharing enabled by default
os.environ.pop(ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, None)
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
results = e.finish_training()
results.sort()
assert results == expected_results


@pytest.mark.parametrize(
"worker_results",
[
(1, [[0]]),
(2, [[0]] + [[1]]),
(3, [[0]] * 2 + [[1]]),
(4, [[0]] * 2 + [[1]] * 2),
],
)
def test_neuron_core_accelerator_ids_sharing_disabled(
ray_2_node_2_neuron_cores, worker_results
):
config = TestConfig()

def get_resources():
neuron_runtime_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR]
# Sort the runtime ids to have exact match with expected result.
sorted_devices = [
int(device) for device in sorted(neuron_runtime_ids.split(","))
]
return sorted_devices

num_workers, expected_results = worker_results

os.environ[ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV] = "0"
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
results = e.finish_training()
results.sort()
assert results == expected_results


def get_node_id_set():
node_id_set = set()
for actor_info in ray._private.state.actors().values():
Expand Down
Loading
Loading