Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] Add accelerator ids to workers and share neuron_cores by default #39091

Merged
merged 18 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 101 additions & 12 deletions python/ray/train/_internal/backend_executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar, Any

import ray
import ray._private.ray_constants as ray_constants
from ray.data import Dataset
from ray._private.ray_constants import env_integer
from ray.air.config import CheckpointConfig
Expand All @@ -27,6 +29,7 @@
TRAIN_ENABLE_WORKER_SPREAD_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
DISABLE_LAZY_CHECKPOINTING_ENV,
ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV,
)
from ray.util.placement_group import get_current_placement_group, remove_placement_group

Expand All @@ -43,6 +46,25 @@ class TrainingWorkerError(Exception):
"""Raised if a worker fails during training."""


@dataclass
class ResourceConfig:
"""
Resource configuration for resource_ids to share between workers.

Args:
resource_name: The name of the resource to configure
(Example: "neuron_cores" or "gpu").
resource_enable_sharing_env_var: The environment variable to
check if the resource should be shared.
share_resource_ids_env_var: The environment variable to configure for
sharing the resources with other workers.
"""

resource_name: str
resource_enable_sharing_env_var: str
share_resource_ids_env_var: str


class BackendExecutor:
"""Main execution class for training backends.

Expand Down Expand Up @@ -101,6 +123,13 @@ def __init__(
self._checkpoint_upload_from_workers = (
checkpoint_config and checkpoint_config._checkpoint_upload_from_workers
)
self._resource_configs = [
ResourceConfig(
ray_constants.NEURON_CORES,
ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV,
ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR,
)
]

def start(
self,
Expand Down Expand Up @@ -153,6 +182,16 @@ def start(

if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled:
self._share_cuda_visible_devices()
elif self._additional_resources_per_worker:
for resource_config in self._resource_configs:
if self._is_share_resources_enabled(
resource_config.resource_name,
resource_config.resource_enable_sharing_env_var,
):
self._share_resource_ids(
resource_config.resource_name,
resource_config.share_resource_ids_env_var,
)
self._backend.on_start(self.worker_group, self._backend_config)
except RayActorError as exc:
logger.exception(str(exc))
Expand Down Expand Up @@ -245,32 +284,82 @@ def _share_cuda_visible_devices(self):
- Worker2: "0,1"

"""
self._share_resource_ids(
ray_constants.GPU, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR
)

node_ids_and_gpu_ids = [
(w.metadata.node_id, w.metadata.gpu_ids) for w in self.worker_group.workers
]
def _share_resource_ids(self, resource: str, env_var: str):
"""Sets the given env_var on all workers.

For each worker, the cores/devices are visible to all the
workers on that worker's node.This allows workers on the
same node to communicate with one another.

Example:

Setup:
- Node1:
- Worker1: {0, 1}
- Worker2: {2, 3}
- Node2:
- Worker3: {0, 1}

NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/...:
- Worker1: "0,1,2,3"
- Worker2: "0,1,2,3"
- Worker2: "0,1"

Args:
resource: The name of the resource/accelerator.
env_var: The name of the environment variable to set.
"""
node_ids_and_resource_ids = [
(
w.metadata.node_id,
w.metadata.resource_ids[resource],
)
for w in self.worker_group.workers
]
node_id_to_worker_id = defaultdict(set)
node_id_to_gpu_ids = defaultdict(set)
node_id_to_resource_ids = defaultdict(set)

for worker_id, (node_id, gpu_ids) in enumerate(node_ids_and_gpu_ids):
for worker_id, (node_id, resource_ids) in enumerate(node_ids_and_resource_ids):
node_id_to_worker_id[node_id].add(worker_id)
node_id_to_gpu_ids[node_id].update(gpu_ids)
node_id_to_resource_ids[node_id].update(resource_ids)

futures = []
for node_id, gpu_ids in node_id_to_gpu_ids.items():
gpu_ids = sorted(gpu_ids)
all_gpu_ids = ",".join(gpu_ids)
for node_id, resource_ids in node_id_to_resource_ids.items():
resource_ids = sorted(resource_ids)
all_resource_ids = ",".join(resource_ids)

def set_gpu_ids():
os.environ["CUDA_VISIBLE_DEVICES"] = all_gpu_ids
def set_resource_ids():
os.environ[env_var] = all_resource_ids

for worker_id in node_id_to_worker_id[node_id]:
futures.append(
self.worker_group.execute_single_async(worker_id, set_gpu_ids)
self.worker_group.execute_single_async(worker_id, set_resource_ids)
)
ray.get(futures)

def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: str):
"""Whether to share resource IDs on all workers
based on enable_sharing_env.

This will return true if resources are requested and greater than 0.
Also, user can disable by configuring the `enable_sharing_env` to "0".

Args:
resource_name: The name of the resource/accelerator.
enable_sharing_env: The name of the environment variable
to check.
"""
has_resource_requested = (
self._additional_resources_per_worker.get(resource_name, 0) > 0
)
return has_resource_requested and ray_constants.env_bool(
enable_sharing_env, True
)

def _create_rank_world_size_mappings(self) -> List[Dict]:
"""Create rank and world size mappings for workers.
There are three maps returned:
Expand Down
9 changes: 5 additions & 4 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ class WorkerMetadata:
node_id: ID of the node this worker is on.
node_ip: IP address of the node this worker is on.
hostname: Hostname that this worker is on.
gpu_ids: List of CUDA IDs available to this worker.
resource_ids: Map of accelerator resources
("GPU", "neuron_cores", ..) to their IDs.
pid: Process ID of this worker.
"""

node_id: str
node_ip: str
hostname: str
gpu_ids: Optional[List[str]]
resource_ids: Dict[str, List[str]]
pid: int


Expand Down Expand Up @@ -86,14 +87,14 @@ def construct_metadata() -> WorkerMetadata:
node_id = ray.get_runtime_context().get_node_id()
node_ip = ray.util.get_node_ip_address()
hostname = socket.gethostname()
gpu_ids = [str(gpu_id) for gpu_id in ray.get_gpu_ids()]
resource_ids = ray.get_runtime_context().get_resource_ids()
pid = os.getpid()

return WorkerMetadata(
node_id=node_id,
node_ip=node_ip,
hostname=hostname,
gpu_ids=gpu_ids,
resource_ids=resource_ids,
pid=pid,
)

Expand Down
7 changes: 7 additions & 0 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def _get_defaults_results_dir() -> str:
# Backend.share_cuda_visible_devices. 1 for True, 0 for False.
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES"

# Integer value which if set will not share neuron-core accelerator visible cores
# across workers. 1 for True (default), 0 for False.
ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV = (
"TRAIN_ENABLE_SHARE_NEURON_CORES_ACCELERATOR"
)

# Integer value which indicates the number of seconds to wait when creating
# the worker placement group before timing out.
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV = "TRAIN_PLACEMENT_GROUP_TIMEOUT_S"
Expand All @@ -85,6 +91,7 @@ def _get_defaults_results_dir() -> str:
TRAIN_ENV_VARS = {
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV,
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
RAY_AIR_NEW_PERSISTENCE_MODE,
Expand Down
14 changes: 14 additions & 0 deletions python/ray/train/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def ray_2_node_2_gpu():
cluster.shutdown()


@pytest.fixture
def ray_2_node_2_neuron_cores():
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=4, resources={"neuron_cores": 2})

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


@pytest.fixture
def ray_start_2_cpus():
address_info = ray.init(num_cpus=2)
Expand Down
84 changes: 77 additions & 7 deletions python/ray/train/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time

import ray
import ray._private.ray_constants as ray_constants
from ray import train
from ray.air._internal.util import StartTraceback

Expand All @@ -25,6 +26,7 @@
from ray.train.constants import (
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV,
)
from ray.train.tensorflow import TensorflowConfig
from ray.train.torch import TorchConfig
Expand Down Expand Up @@ -97,7 +99,7 @@ def mock_add_workers(self, num_workers):
node_id=0,
node_ip=str(i % 2),
hostname=0,
gpu_ids=[0],
resource_ids={"GPU": ["0"]},
pid=0,
)
worker.metadata = metadata
Expand Down Expand Up @@ -307,12 +309,6 @@ def check_process_group():
def test_cuda_visible_devices(ray_2_node_2_gpu, worker_results):
config = TestConfig()

if worker_results[0] != len(worker_results[1]):
raise ValueError(
"Invalid test parameter. Length of expected result should "
"match number of workers."
)

def get_resources():
cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]
# Sort the cuda visible devices to have exact match with expected result.
Expand Down Expand Up @@ -419,6 +415,80 @@ def get_resources():
assert results == expected_results


@pytest.mark.parametrize(
"worker_results",
[
(1, [[0]]),
(2, [[0, 1]] * 2),
(3, [[0]] + [[0, 1]] * 2),
(4, [[0, 1]] * 4),
],
)
def test_neuron_core_accelerator_ids(ray_2_node_2_neuron_cores, worker_results):
config = TestConfig()

def get_resources():
neuron_resource_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR]
# Sort the runtime ids to have exact match with expected result.
sorted_devices = [
int(device) for device in sorted(neuron_resource_ids.split(","))
]
return sorted_devices

num_workers, expected_results = worker_results
# sharing enabled by default
os.environ.pop(ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, None)
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
results = e.finish_training()
results.sort()
assert results == expected_results


@pytest.mark.parametrize(
"worker_results",
[
(1, [[0]]),
(2, [[0]] + [[1]]),
(3, [[0]] * 2 + [[1]]),
(4, [[0]] * 2 + [[1]] * 2),
],
)
def test_neuron_core_accelerator_ids_sharing_disabled(
ray_2_node_2_neuron_cores, worker_results
):
config = TestConfig()

def get_resources():
neuron_resource_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR]
# Sort the runtime ids to have exact match with expected result.
sorted_devices = [
int(device) for device in sorted(neuron_resource_ids.split(","))
]
return sorted_devices

num_workers, expected_results = worker_results

os.environ[ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV] = "0"
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
results = e.finish_training()
results.sort()
assert results == expected_results


def get_node_id_set():
node_id_set = set()
for actor_info in ray._private.state.actors().values():
Expand Down
Loading
Loading