Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Placement Group Support #28

Merged
merged 28 commits into from Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Expand Up @@ -160,6 +160,29 @@ setting this explicitly.
The number of XGBoost actors always has to be set manually with
the `num_actors` argument.

Placement Strategies
---------------------
`xgboost_ray` leverages Ray's Placement Group API (https://docs.ray.io/en/master/placement-group.html)
to implement placement strategies for better fault tolerance.

By default, a SPREAD strategy is used for training, which attempts to spread all of the training workers
across the nodes in a cluster on a best-effort basis. This improves fault tolerance since it minimizes the
number of worker failures when a node goes down, but comes at a cost of increased inter-node communication
To disable this strategy, set the `USE_SPREAD_STRATEGY` environment variable to 0. If disabled, no
particular placement strategy will be used.

Note that this strategy is used only when `elastic_training` is not used. If `elastic_trainin` is set to `True`,
amogkam marked this conversation as resolved.
Show resolved Hide resolved
no placement strategy is used.

When `xgboost_ray` is used with Ray Tune for hyperparameter tuning, a PACK strategy is used. This strategy
attempts to place all workers for each trial on the same node on a best-effort basis. This means that if a node
goes down, it will be less likely to impact multiple trials.

When placement strategies are used, `xgboost_ray` will wait for 100 seconds for the required resources
to become available, and will fail if the required resources cannot be reserved and the cluster cannot autoscale
to increase the number of resources. You can change the `PLACEMENT_GROUP_TIMEOUT_S` environment variable to modify
how long this timeout should be.

More examples
-------------

Expand Down
162 changes: 124 additions & 38 deletions xgboost_ray/main.py
Expand Up @@ -10,6 +10,8 @@
import numpy as np

import xgboost as xgb
from ray.util import placement_group
from ray.util.placement_group import PlacementGroup, remove_placement_group
from xgboost.core import XGBoostError

try:
Expand Down Expand Up @@ -39,6 +41,12 @@
from xgboost_ray.session import init_session, put_queue, \
set_session_queue

# Whether to use SPREAD placement group strategy for training.
_USE_SPREAD_STRATEGY = int(os.getenv("USE_SPREAD_STRATEGY", 1))

# How long to wait for placement group creation before failing.
PLACEMENT_GROUP_TIMEOUT_S = int(os.getenv("PLACEMENT_GROUP_TIMEOUT_S", 100))
amogkam marked this conversation as resolved.
Show resolved Hide resolved


class RayXGBoostTrainingError(RuntimeError):
"""Raised from RayXGBoostActor.train() when the local xgb.train function
Expand Down Expand Up @@ -128,8 +136,8 @@ def _ray_get_cluster_cpus():
return ray.cluster_resources().get("CPU", None)


def _get_max_node_cpus():
max_node_cpus = max(
def _get_min_node_cpus():
max_node_cpus = min(
node.get("Resources", {}).get("CPU", 0.0) for node in ray.nodes())
return max_node_cpus if max_node_cpus > 0.0 else _ray_get_cluster_cpus()

Expand Down Expand Up @@ -406,18 +414,42 @@ def predict(self, model: xgb.Booster, data: RayDMatrix, **kwargs):
return predictions


def _autodetect_resources(ray_params: Union[None, RayParams, Dict] = None,
use_tree_method: bool = False) -> Tuple[int, int]:
gpus_per_actor = ray_params.gpus_per_actor
cpus_per_actor = ray_params.cpus_per_actor

# Automatically set gpus_per_actor if left at the default value
if gpus_per_actor == -1:
gpus_per_actor = 0
if use_tree_method:
gpus_per_actor = 1

# Automatically set cpus_per_actor if left at the default value
# Will be set to the number of cluster CPUs divided by the number of
# actors, bounded by the minimum number of CPUs across actors nodes.
if cpus_per_actor <= 0:
cluster_cpus = _ray_get_cluster_cpus() or 1
cpus_per_actor = min(
int(_get_min_node_cpus() or 1),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this from max to min leads to quite different behavior, but I guess we can assume that people with heterogeneous cluster setups might set this manually anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(So I'm fine with this change)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, for homogenous clusters this does not make a difference since min==max. But for heterogenous clusters I believe using max is actually incorrect behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min is definitely the safe choice. Generally, max makes sure that at least one node can schedule an actor, min makes sure that each node can schedule actors. I had max in as a sanity check, to make sure that the number of requested CPUs is never larger than the largest node size, just as an upper bound. For scheduling this was desirable in the case where we have a small head node and large workers (e.g. for GPU training, where I used this setup).

Generally we should just stress to set num cpus per actor manually. We might even want to log a warning if we connect to an existing cluster and num cpus per actor is not set. Let's discuss this sometime!

int(cluster_cpus // ray_params.num_actors))
return cpus_per_actor, gpus_per_actor


def _create_actor(rank: int,
num_actors: int,
num_cpus_per_actor: int,
num_gpus_per_actor: int,
resources_per_actor: Optional[Dict] = None,
placement_group: Optional[PlacementGroup] = None,
queue: Optional[Queue] = None,
checkpoint_frequency: int = 5) -> ActorHandle:

return RayXGBoostActor.options(
num_cpus=num_cpus_per_actor,
num_gpus=num_gpus_per_actor,
resources=resources_per_actor).remote(
resources=resources_per_actor,
placement_group=placement_group).remote(
amogkam marked this conversation as resolved.
Show resolved Hide resolved
rank=rank,
num_actors=num_actors,
queue=queue,
Expand Down Expand Up @@ -495,23 +527,54 @@ def _get_actor_alive_status(actors: List[ActorHandle],
def _shutdown(actors: List[ActorHandle],
queue: Optional[Queue] = None,
event: Optional[Event] = None,
placement_group: Optional[PlacementGroup] = None,
force: bool = False):
for i in range(len(actors)):
actor = actors[i]
if actor is None:
continue
if force:
alive_actors = [a for a in actors if a is not None]
if force:
for actor in alive_actors:
ray.kill(actor)
else:
try:
ray.get(actor.__ray_terminate__.remote())
except RayActorError:
else:
done_refs = [a.__ray_terminate__.remote() for a in alive_actors]
# Wait 5 seconds for actors to die gracefully.
done, not_done = ray.wait(done_refs, timeout=5)
if not_done:
# If all actors are not able to die gracefully, then kill them.
for actor in alive_actors:
ray.kill(actor)
for i in range(len(actors)):
actors[i] = None
if queue:
queue.shutdown()
if event:
event.shutdown()
if placement_group:
remove_placement_group(placement_group)


def _create_placement_group(cpus_per_actor, gpus_per_actor,
resources_per_actor, num_actors, strategy):
resources_per_bundle = {"CPU": cpus_per_actor, "GPU": gpus_per_actor}
amogkam marked this conversation as resolved.
Show resolved Hide resolved
extra_resources_per_bundle = {} if resources_per_actor is None else \
resources_per_actor
# Create placement group for training worker colocation.
bundles = [{
**resources_per_bundle,
**extra_resources_per_bundle
} for _ in range(num_actors)]
pg = placement_group(bundles, strategy=strategy)
# Wait for placement group to get created.
logger.debug("Waiting for placement group to start.")
ready, _ = ray.wait([pg.ready()], timeout=PLACEMENT_GROUP_TIMEOUT_S)
if ready is not None:
logger.debug("Placement group has started.")
else:
raise TimeoutError("Placement group creation timed out. Make sure "
amogkam marked this conversation as resolved.
Show resolved Hide resolved
"your cluster either has enough resources or use "
"an autoscaling cluster. Current resources "
"available: {}, resources requested by the "
"placement group: {}".format(
ray.available_resources(), pg.bundle_specs))
return pg


def _create_communication_processes():
Expand All @@ -529,11 +592,14 @@ def _train(params: Dict,
*args,
evals=(),
ray_params: RayParams,
cpus_per_actor: int,
gpus_per_actor: int,
Comment on lines +595 to +596
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still read those from ray_params (and not pass them here?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have gone through automatic detection already at this point, so they might not be the same as what's in ray_params.

_checkpoint: _Checkpoint,
_additional_results: Dict,
_actors: List,
_queue: Queue,
_stop_event: Event,
_placement_group: PlacementGroup,
_failed_actor_ranks: set,
**kwargs) -> Tuple[xgb.Booster, Dict, Dict]:
"""This is the local train function wrapped by :func:`train() <train>`.
Expand All @@ -548,29 +614,6 @@ def _train(params: Dict,
errors occur. It is called more than once if errors occurred (e.g. an
actor died) and failure handling is enabled.
"""
_assert_ray_support()

if not ray.is_initialized():
ray.init()

gpus_per_actor = ray_params.gpus_per_actor
cpus_per_actor = ray_params.cpus_per_actor

# Automatically set gpus_per_actor if left at the default value
if gpus_per_actor == -1:
gpus_per_actor = 0
if "tree_method" in params and params["tree_method"].startswith("gpu"):
gpus_per_actor = 1

# Automatically set cpus_per_actor if left at the default value
# Will be set to the number of cluster CPUs divided by the number of
# actors, bounded by the maximum number of CPUs across actors nodes.
if cpus_per_actor <= 0:
cluster_cpus = _ray_get_cluster_cpus() or 1
cpus_per_actor = min(
int(_get_max_node_cpus() or 1),
int(cluster_cpus // ray_params.num_actors))

if "nthread" in params:
if params["nthread"] > cpus_per_actor:
raise ValueError(
Expand Down Expand Up @@ -604,6 +647,7 @@ def handle_actor_failure(actor_id):
num_cpus_per_actor=cpus_per_actor,
num_gpus_per_actor=gpus_per_actor,
resources_per_actor=ray_params.resources_per_actor,
placement_group=_placement_group,
queue=_queue,
checkpoint_frequency=ray_params.checkpoint_frequency)
# Set actor entry in our list
Expand Down Expand Up @@ -810,7 +854,24 @@ def train(params: Dict,
"`dtrain = RayDMatrix(data=data, label=label)`.".format(
type(dtrain)))

_try_add_tune_callback(kwargs)
if not ray.is_initialized():
ray.init()

cpus_per_actor, gpus_per_actor = _autodetect_resources(
ray_params=ray_params,
use_tree_method="tree_method" in params
and params["tree_method"].startswith("gpu"))

if gpus_per_actor == 0 and cpus_per_actor == 0:
raise ValueError("cpus_per_actor and gpus_per_actor both cannot be "
"0. Are you sure your cluster has CPUs available?")

added_tune_callback = _try_add_tune_callback(kwargs)
# Tune currently does not support elastic training.
if added_tune_callback and ray_params.elastic_training:
raise ValueError("Elastic Training cannot be used with Ray Tune. "
"Please disable elastic_training in RayParams in "
"order to use xgboost_ray with Tune.")

if not dtrain.loaded and not dtrain.distributed:
dtrain.load_data(ray_params.num_actors)
Expand All @@ -826,7 +887,24 @@ def train(params: Dict,
checkpoint = _Checkpoint() # Keep track of latest checkpoint
current_results = {} # Keep track of additional results
actors = [None] * ray_params.num_actors # All active actors

# Create the Queue and Event actors.
queue, stop_event = _create_communication_processes()

placement_strategy = None
if not ray_params.elastic_training:
if added_tune_callback:
placement_strategy = "PACK"
elif bool(_USE_SPREAD_STRATEGY):
placement_strategy = "SPREAD"

if placement_strategy is not None:
pg = _create_placement_group(cpus_per_actor, gpus_per_actor,
ray_params.resources_per_actor,
ray_params.num_actors, placement_strategy)
else:
pg = None

start_actor_ranks = set(range(ray_params.num_actors)) # Start these
while tries <= max_actor_restarts:
try:
Expand All @@ -836,11 +914,14 @@ def train(params: Dict,
*args,
evals=evals,
ray_params=ray_params,
cpus_per_actor=cpus_per_actor,
gpus_per_actor=gpus_per_actor,
Comment on lines +917 to +918
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead we can pass this in through a actor_options kwarg instead of 3 args?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually prefer to leave it as is so it's clear as to what all of the arguments are. Plus this is an internal function so it should be fine to add these extra args. @krfricke any thoughts here?

_checkpoint=checkpoint,
_additional_results=current_results,
_actors=actors,
_queue=queue,
_stop_event=stop_event,
_placement_group=pg,
_failed_actor_ranks=start_actor_ranks,
**kwargs)
break
Expand All @@ -864,7 +945,7 @@ def train(params: Dict,
f"Sleeping for 10 seconds for cleanup.")
start_again = True

if tries + 1 <= max_actor_restarts:
elif tries + 1 <= max_actor_restarts:
amogkam marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(
f"A Ray actor died during training. Trying to restart "
f"and continue training from last checkpoint "
Expand All @@ -887,7 +968,12 @@ def train(params: Dict,
) from exc
tries += 1

_shutdown(actors=actors, queue=queue, event=stop_event, force=False)
_shutdown(
actors=actors,
queue=queue,
event=stop_event,
placement_group=pg,
force=False)

if isinstance(evals_result, dict):
evals_result.update(train_evals_result)
Expand Down