In [None]:
!pip install -qU "ray[tune]" xgboost_ray

In [None]:
from typing import Tuple

import ray
from ray.train.xgboost import XGBoostPredictor
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
from ray.data import Dataset
from ray.air.result import Result
from ray.data.preprocessors import StandardScaler

In [None]:
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(["target"])
    return train_dataset, valid_dataset, test_dataset

In [None]:
@ray.remote
def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:
    train_dataset, valid_dataset, _ = prepare_data()

    # Scale some random columns
    columns_to_scale = ["mean radius", "mean texture"]
    preprocessor = StandardScaler(columns=columns_to_scale)

    # XGBoost specific params
    params = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }

    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        label_column="target",
        params=params,
        datasets={"train": train_dataset, "valid": valid_dataset},
        preprocessor=preprocessor,
        num_boost_round=100,
    )
    result = trainer.fit()
    print(result.metrics)

    return result


In [None]:
import pandas as pd
from ray.air import Checkpoint
from ray.data import ActorPoolStrategy


class Predict:

    def __init__(self, checkpoint: Checkpoint):
        self.predictor = XGBoostPredictor.from_checkpoint(checkpoint)

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.predictor.predict(batch)

@ray.remote
def predict_xgboost(result: Result):
    _, _, test_dataset = prepare_data()

    scores = test_dataset.map_batches(
        Predict, 
        fn_constructor_args=[result.checkpoint], 
        compute=ActorPoolStrategy(), 
        batch_format="pandas"
    )
    
    predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    print(f"PREDICTED LABELS")
    predicted_labels.show()

In [None]:
ray.shutdown()
ray.init(address="ray://raycluster-kuberay-head-svc.default.svc.cluster.local:10001", runtime_env={"pip": ["xgboost", "xgboost_ray"]})

In [None]:
result = train_xgboost.remote(num_workers=2, use_gpu=False)

In [None]:
predict_xgboost.remote(result)