Ray is a general purpose distributed execution framework. Ray can be used to scale computations from a single node to a cluster of hundreds of nodes without changing any code.
The Python bindings of Ray come with a collection of well maintained machine learning libraries for hyperparameter optimization and model serving.
The XGBoost-Ray project provides an interface to run XGBoost training and prediction jobs on a Ray cluster. It allows to utilize distributed data representations, such as Modin dataframes, as well as distributed loading from cloud storage (e.g. Parquet files).
XGBoost-Ray integrates well with hyperparameter optimization library Ray Tune, and implements advanced fault tolerance handling mechanisms. With Ray you can scale your training jobs to hundreds of nodes just by adding new nodes to a cluster. You can also use Ray to leverage multi GPU XGBoost training.
Ray can be installed from PyPI like this:
pip install ray
If you're using Ray on a single machine, you don't need to do anything else - XGBoost-Ray will automatically start a local Ray cluster when used.
If you want to use Ray on a cluster, you can use the Ray cluster launcher.
XGBoost-Ray is also available via PyPI:
pip install xgboost_ray
This will install all dependencies needed to run XGBoost on Ray, including Ray itself if it hasn't been installed before.
XGBoost-Ray uses the same API as core XGBoost. There are only two differences:
- Instead of using a
xgboost.DMatrix
, you'll use axgboost_ray.RayDMatrix
object - There is an additional
xgboost_ray.RayParams
parameter that you can use to configure distributed training.
To run this simple example, you'll need to install
scikit-learn (with pip install sklearn
).
In this example, we will load the breast cancer dataset and train a binary classifier using two actors.
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer
train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)
evals_result = {}
bst = train(
{
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
train_set,
evals_result=evals_result,
evals=[(train_set, "train")],
verbose_eval=False,
ray_params=RayParams(num_actors=2, cpus_per_actor=1))
bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
evals_result["train"]["error"][-1]))
The only differences compared to the non-distributed API are
the import statement (xgboost_ray
instead of xgboost
), using the
RayDMatrix
instead of the DMatrix
, and passing a xgboost_ray.RayParams
object.
The return object is a regular xgboost.Booster
instance.
from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb
data, labels = load_breast_cancer(return_X_y=True)
dpred = RayDMatrix(data, labels)
bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2))
print(pred_ray)
In this example, the data will be split across two actors. The result array will integrate this data in the correct order.
The RayParams
object is used to configure various settings relating to the distributed
training.
Ray automatically detects GPUs on cluster nodes.
In order to start training on multiple GPUs, all you have to do is
to set the gpus_per_actor
parameter of the RayParams
object, as well
as the num_actors
parameter for multiple GPUs:
ray_params = RayParams(
num_actors=4,
gpus_per_actor=1,
)
This will train on four GPUs in parallel.
Note that it usually does not make sense to allocate more than one GPU per actor, as XGBoost relies on distributed libraries such as Dask or Ray to utilize multi GPU training.
XGBoost natively utilizes multi threading to speed up computations. Thus if your are training on CPUs only, there is likely no benefit in using more than one actor per node. In that case, assuming you have a cluster of homogeneous nodes, set the number of CPUs per actor to the number of CPUs available on each node, and the number of actors to the number of nodes.
If you are using multi GPU training on a single node, divide the number of available CPUs evenly across all actors. For instance, if you have 16 CPUs and 4 GPUs available, each actor should access 1 GPU and 4 CPUs.
If you are using a cluster of heterogeneous nodes (with different amounts of CPUs), you might just want to use the greatest common divisor for the number of CPUs per actor. E.g. if you have a cluster of three nodes with 4, 8, and 12 CPUs, respectively, you'd start 6 actors with 4 CPUs each for maximum CPU utilization.
XGBoost-Ray supports two fault tolerance modes. In non-elastic training, whenever a training actor dies (e.g. because the node goes down), the training job will stop, XGBoost-Ray will wait for the actor (or its resources) to become available again (this might be on a different node), and then continue training once all actors are back.
In elastic-training, whenever a training actor dies, the rest of the actors continue training without the dead actor. If the actor comes back, it will be re-integrated into training again.
Please note that in elastic-training this means that you will train on fewer data for some time. The benefit is that you can continue training even if a node goes away for the remainder of the training run, and don't have to wait until it is back up again. In practice this usually leads to a very minor decrease in accuracy but a much shorter training time compared to non-elastic training.
Both training modes can be configured using the respective xgboost_ray.RayParams
parameters.
XGBoost-Ray integrates well with hyperparameter optimization framework Ray Tune. Ray Tune uses Ray to start multiple distributed trials with different hyperparameter configurations. If used with XGBoost-Ray, these trials will then start their own distributed training jobs.
XGBoost-Ray automatically reports evaluation results back to Ray Tune. There's only a few things you need to do:
- Put your XGBoost-Ray training call into a function accepting parameter configurations
(
train_model
in the example below). - Create a
xgboost_ray.RayParams
object (ray_params
in the example below). - Define the parameter search space (
config
dict in the example below). - Call
tune.run()
: - The
metric
parameter should contain the metric you'd like to optimize. Usually this consists of the prefix passed to theevals
argument ofxgboost_ray.train()
, and aneval_metric
passed in the XGBoost parameters (train-error
in the example below). - The
mode
should either bemin
ormax
, depending on whether you'd like to minimize or maximize the metric - The
resources_per_actor
should be set usingray_params.get_tune_resources()
. This will make sure that each trial has the necessary resources available to start their distributed training jobs.
- The
- Call
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer
num_actors = 4
num_cpus_per_actor = 1
ray_params = RayParams(
num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)
def train_model(config):
train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)
evals_result = {}
bst = train(
params=config,
dtrain=train_set,
evals_result=evals_result,
evals=[(train_set, "train")],
verbose_eval=False,
ray_params=ray_params)
bst.save_model("model.xgb")
from ray import tune
# Specify the hyperparameter search space.
config = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
train_model,
config=config,
metric="train-error",
mode="min",
num_samples=4,
resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)
Ray Tune supports various search algorithms and libraries (e.g. BayesOpt, Tree-Parzen estimators), smart schedulers like successive halving, and other features. Please refer to the Ray Tune documentation for more information.