## Distributed XGBoost with GPUs on Ray 

Ray provides a version of XGBoost to perform distributed data parallelism. With drop-in replacements of `xgboost` native classes, XGboost Ray allows you to leverage multi-node clusters to distribute your training. 

This demo uses a dataset created from `00-create-dataset` with 100M rows x 100 features columns x 1 target column (5 classes) for multi-class classification. This dataset is ~40GiB. 

`01a-train-with-GPUs` demonstrates in-core distributed training. This means you need enough VRAM for approximately ~2x the dataset size (i.e., 40GB dataset * 2 = 80GB VRAM). Using A10G (24GB each) we need approximately 3 A10s. 

`01b-train-with-GPUs-External-Memory` (this notebook) demonstrates out-of-core distributed training. The advantage here is that we can train with less VRAM than the full dataset size at the cost of needing to tune batch sizes (to maximize GPU and VRAM usage) and longer training times. 


#### Compute specifications to run this notebook
```json
{
    "num_workers": 8,
    "cluster_name": "Multi-node MLR w/ GPUs",
    "spark_version": "16.4.x-gpu-ml-scala2.13",
    "spark_conf": {
        "spark.task.resource.gpu.amount": "0",
        "spark.executor.memory": "1g"
    },
    "aws_attributes": {
        "first_on_demand": 1,
        "availability": "SPOT_WITH_FALLBACK",
        "zone_id": "auto",
        "spot_bid_price_percent": 100,
        "ebs_volume_count": 0
    },
    "node_type_id": "g5.8xlarge",
    "driver_node_type_id": "g5.8xlarge",
    "autotermination_minutes": 60,
    "enable_elastic_disk": false,
    "single_user_name": "jon.cheung@databricks.com",
    "enable_local_disk_encryption": false,
    "data_security_mode": "SINGLE_USER",
    "runtime_engine": "STANDARD",
    "assigned_principal": "user:jon.cheung@databricks.com",
}
```

In [0]:
%pip install -qU ray[all] xgboost rmm-cu12

dbutils.library.restartPython()

In [0]:
num_training_rows = 100_000_000
num_training_columns = 100
num_labels = 5
catalog = "main"
schema = "ray_gtm_examples"

table = f"synthetic_data_{num_training_rows}_rows_{num_training_columns}_columns_{num_labels}_labels"
label="target"
mlflow_experiment_name = f"/Users/jon.cheung@databricks.com/ray_xgboost"

# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
ray_xgboost_path = '/dbfs/Users/jon.cheung@databricks.com/ray_xgboost/' 
# This is for stashing the cluster logs
ray_logs_path = "/dbfs/Users/jon.cheung@databricks.com/ray_collected_logs/"

In [0]:
import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
import os

restart = True
if restart is True:
  try:
    shutdown_ray_cluster()
  except:
    pass
  try:
    ray.shutdown()
  except:
    pass

# Set the parameters here so mlflow works properly at the Ray head + worker nodes
os.environ['DATABRICKS_HOST'] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
os.environ['DATABRICKS_TOKEN'] = dbutils.secrets.get(scope = "development", key = "jon_cheung_PAT")

# The below configuration mirrors my Spark worker cluster set up. Change this to match your cluster configuration. 
setup_ray_cluster(
  min_worker_nodes=8,
  max_worker_nodes=8,
  num_cpus_worker_node=32,
  num_gpus_worker_node=1,
  num_cpus_head_node=32,
  num_gpus_head_node=1,
  collect_log_to_path=ray_logs_path
)

In [0]:
import cupy as cp
import xgboost
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

# It's important to use RMM for GPU-based external memory to improve performance.
# If XGBoost is not built with RMM support, a warning will be raised.
# We use the pool memory resource here for simplicity, you can also try the
# `ArenaMemoryResource` for improved memory fragmentation handling.
mr = rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource())
rmm.mr.set_current_device_resource(mr)
# Set the allocator for cupy as well.
cp.cuda.set_allocator(rmm_cupy_allocator)

class RayDataIter(xgboost.core.DataIter):
    def __init__(self, ray_iterator: ray.data.DataIterator, label_col: str):
        super().__init__()
        self.label_col = label_col
        self.iterator = ray_iterator
        self._generator = None

    def reset(self):
        self._generator = iter(self.iterator)

    def next(self, input_data):
        try:
            batch = next(self._generator)
        except StopIteration:
            return False
        
        y = cp.asarray(batch[self.label_col])
        X = cp.column_stack([cp.asarray(batch[col]) for col in batch if col != self.label_col])

        input_data(data=X, label=y)
        return True

In [0]:
import ray
import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback
import os


try: 
  ## Option 1 (PREFERRED): Build a Ray Dataset using a Databricks SQL Warehouse
  # Insert your SQL warehouse ID here. I've queried my 100M row dataset using a Small t-shirt sized cluster.

  # Ensure you've set the DATABRICKS_TOKEN so you can query using the warehouse compute
  ds = ray.data.read_databricks_tables(
    warehouse_id='2a72600bb68f00ee',
    catalog=catalog,
    schema=schema,
    query=f'SELECT * FROM {table}',
  )
  # Testing out read from UC. However, this requires turning off deletion vectors
  # ds = ray.data.read_unity_catalog(f'{catalog}.{schema}.{table}',
  #                                url='https://e2-demo-field-eng.cloud.databricks.com/',
  #                                token=os.environ['DATABRICKS_TOKEN'],
  #                                region='us-west-2',
  #                                reader_kwargs={"override_num_blocks": 1000})
  print('read directly from UC')
except: 
  ## Option 2: Build a Ray Dataset using a Parquet files
  # If you have too many Ray nodes, you may not be able to create a Ray dataset using the warehouse method above because of rate limits. One back up solution is to create parquet files from the delta table and build a ray dataset from that. This is not the recommended route because, in essence, you are duplicating data.
  parquet_path = f'/Volumes/{catalog}/{schema}/synthetic_data/{table}'
  ds = ray.data.read_parquet(parquet_path)
  print('read directly from parquet')

train_dataset, val_dataset = ds.train_test_split(test_size=0.25)



In [0]:
import xgboost
import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback


def train_fn_per_worker(params: dict):
    """
    Trains an XGBoost model on a shard of the distributed dataset assigned to this worker.

    This should look very similar to a vanilla XGboost training.

    This function is designed to be executed by individual Ray Train workers.
    It retrieves the training and validation data shards, converts them to DMatrix format,
    and performs a portion of the distributed XGBoost training. Ray Train handles
    the inter-worker communication.

    Args:
        params (dict): A dictionary of XGBoost training parameters, including
                       'num_estimators', 'eval_metric', and potentially other
                       XGBoost-specific parameters.
    """

    # Get dataset shards for this worker
    train_shard = ray.train.get_dataset_shard("train")
    val_shard = ray.train.get_dataset_shard("val")

    ### INSERTED
    iterator = train_shard.iter_batches(batch_format="numpy", batch_size=256*16_384, prefetch_batches=2)
    streaming_iter = RayDataIter(iterator, label_col=label)

    val_iterator = val_shard.iter_batches(batch_format="numpy", batch_size=256*16_384, prefetch_batches=2)
    streaming_val_iter = RayDataIter(val_iterator, label_col=label)
    with xgboost.config_context(use_rmm=True):
        # External Quantile DMatrix (streams data, minimal memory usage, GPU-optimized)
        qdm_train = xgboost.ExtMemQuantileDMatrix(streaming_iter)
        # qdm_val = xgboost.DMatrix(streaming_val_iter, ref=qdm_train)
        qdm_val = xgboost.ExtMemQuantileDMatrix(streaming_val_iter, ref=qdm_train)

        # Do distributed data-parallel training.
        # Ray Train sets up the necessary coordinator processes and
        # environment variables for workers to communicate with each other.
        evals_results = {}
        bst = xgboost.train(
            params,
            dtrain=qdm_train,
            evals=[(qdm_val, "validation")],
            num_boost_round=params['num_estimators'],
            evals_result=evals_results,
            # early_stopping_rounds=params['early_stopping_rounds'],
            callbacks=[RayTrainReportCallback(metrics={params['eval_metric']: f"validation-{params['eval_metric']}"},
                                            frequency=1)],
        )


In [0]:
def train_driver_fn(config: dict, train_dataset, val_dataset):
    """
    Drives the distributed XGBoost training process using Ray Train.

    This function sets up the XGBoostTrainer, configures scaling (number of workers, GPU usage,
    and resources per worker), and initiates the distributed training by calling `trainer.fit()`.
    It also propagates metrics back to Ray Tune if integrated.

    Args:
        config (dict): A dictionary containing run-level hyperparameters such as
                       'num_workers', 'use_gpu', and a nested 'params' dictionary
                       for XGBoost training parameters.
        train_dataset: The Ray Dataset for training.
        val_dataset: The Ray Dataset for validation.

    Returns:
        None: The function reports metrics to Ray Tune but does not explicitly return a value.
              The trained model artifact is typically handled by Ray Train's checkpointing
              or by the `train_fn_per_worker` if saved directly.
    """
    # Unpack run-level hyperparameters.
    num_workers = config["num_workers"]
    use_gpu = config["use_gpu"]
    params = config['params']

    # Initialize the XGBoostTrainer, which orchestrates the distributed training using Ray.
    trainer = XGBoostTrainer(
      train_loop_per_worker=train_fn_per_worker, # The function to be executed on each worker
      train_loop_config=params,
      # By default Ray uses 1 GPU and 1 CPU per worker if resources_per_worker is not specified.
      # XGBoost is multi-threaded, so multiple CPUs can be assigned per worker, but not GPUs.
      scaling_config=ray.train.ScalingConfig(num_workers=num_workers, 
                                             use_gpu=use_gpu),
      datasets={"train": train_dataset, "val": val_dataset},  # Ray Datasets to be used by the trainer + workers
      run_config=ray.train.RunConfig(storage_path=ray_xgboost_path,                                  
                                    #  name=f"train-trial_id={ray.tune.get_context().get_trial_id()}")
      )
    )
                                    
    result = trainer.fit()
    
    # Propagate metrics back up for Ray Tune. 
    # Ensure 'mlogloss' is the correct metric key based on your eval_metric and results.
    ray.tune.report({params['eval_metric']: result.metrics['mlogloss']})

In [0]:
# with test.checkpoint.as_directory() as checkpoint_dir:
#     model_path = os.path.join(checkpoint_dir, RayTrainReportCallback.CHECKPOINT_NAME)
#     print(model_path)
#     model = xgboost.Booster()
#     model.load_model(model_path)

## Ray Tune with Ray Train and Mlflow Integration

https://docs.ray.io/en/latest/train/user-guides/hyperparameter-optimization.html#hyperparameter-tuning-with-ray-tune

In [0]:
from ray import tune
from ray.tune.tuner import Tuner
from ray.tune.search.optuna import OptunaSearch
from ray.air.integrations.mlflow import MLflowLoggerCallback


# Define resources per HPO trial and calculate max concurrent HPO trials
num_gpu_workers_per_trial = 1
num_hpo_trials = 8
resources = ray.cluster_resources()
total_cluster_gpus = resources.get("GPU") 
max_concurrent_trials = int(total_cluster_gpus // num_gpu_workers_per_trial)


# Define the hyperparameter search space.
# XGB sample hyperparameter configs
param_space = {
    "num_workers": num_gpu_workers_per_trial,
    "use_gpu": True,
    "params":{"objective": "multi:softmax",
              'eval_metric': 'mlogloss', 
              "tree_method": "hist",
              "device": "cuda",
              "num_class": num_labels,
              "learning_rate": tune.uniform(0.01, 0.3),
              "num_estimators": tune.randint(25, 50),
              'sampling_method': 'gradient_based',}
}

# # Set up search algorithm. Here we use Optuna and use the default the Bayesian sampler (i.e. TPES)
optuna = OptunaSearch(metric=param_space['params']['eval_metric'], 
                      mode="min")

# Set up Tuner job and run.
tuner = tune.Tuner(
    tune.with_parameters(train_driver_fn,
                         train_dataset = train_dataset,
                         val_dataset = val_dataset),
    run_config=tune.RunConfig(name='mlflow',
                              callbacks=[MLflowLoggerCallback(
                                  experiment_name=mlflow_experiment_name,
                                  save_artifact=True,
                                  log_params_on_trial_end=True)]
                              ),
    tune_config=tune.TuneConfig(num_samples=num_hpo_trials,
                                max_concurrent_trials=max_concurrent_trials,
                                search_alg=optuna,
                                ),
    param_space=param_space,

    )

results = tuner.fit()

best_params = results.get_best_result(metric=param_space['params']['eval_metric'], 
                        mode="min").config

print(best_params)