# Training XGBoost with Dask RAPIDS in Databricks

This notebook shows how to deploy Dask RAPIDS workflow in Databricks. We will focus on the HIGGS dataset, a moderately sized classification problem from the [UCI Machine Learning repository.](https://archive.ics.uci.edu/dataset/280/higgs)

In the following sections, we start from basic data loading from Delta Lake and preprocessing with Dask. Then, train an XGBoost model on returned data with different configurations. Lastly, we share some optimization techniques with inference.


## Launch multi-node Dask Cluster

This workflow example can be ran on GPU, and you don't even need to have the GPU locally since Databricks can provide one for you. Whereas Dask enables users to easily distribute or scale up computation tasks within a single GPU or across multiple GPUs.

Dask  recently introduced [**dask-databricks**](https://github.com/dask-contrib/dask-databricks) (available via [conda](https://github.com/conda-forge/dask-databricks-feedstock) and [pip](https://pypi.org/project/dask-databricks/)). With this CLI tool, the `dask databricks run --cuda` command will launch a dask scheduler in the driver node and cuda workers in the remaining nodes.

From a high level, we could break down this section into the following steps:
* dask-databricks can help us launch a Dask cluster alongside Spark on databricks
* Create a new init script that installs RAPIDS and runs `dask-databricks`
* Create a new multi-node cluster that uses the init script
* Once the cluster is running upload this notebook to Databricks and continue running these cells on there

```{docref} /platforms/databricks
For more detailed information on launching Dask-RAPIDS in Databricks see the documentation.
```


## Import packages 

Once your cluster has launched, start by importing all necessary libraries and dependencies.

In [None]:
pip list

Package                      Version
---------------------------- -------------
absl-py                      1.4.0
asttokens                    2.2.1
astunparse                   1.6.3
backcall                     0.2.0
blinker                      1.4
bokeh                        3.2.2
cachetools                   5.3.1
certifi                      2023.7.22
charset-normalizer           3.2.0
click                        8.1.7
cloudpickle                  3.0.0
comm                         0.1.3
contourpy                    1.1.0
cryptography                 3.4.8
cubinlinker-cu11             0.3.0.post1
cuda-python                  11.8.3
cudf-cu11                    23.10.2
cuml-cu11                    23.10.0
cupy-cuda11x                 12.3.0
cycler                       0.11.0
dask                         2023.9.2
dask-cuda                    23.10.0
dask-cudf-cu11               23.10.2
dask-databricks              0.3.0
dask-deltatable              0.3.1
dask-glm               

In [None]:
import os
from time import time
from typing import Tuple

import pandas as pd
import numpy as np
import cupy
import cudf
import dask
import dask.dataframe as dd
import dask_cudf
import dask_databricks
import dask_deltatable as ddt
import xgboost as xgb
from xgboost import dask as dxgb
from dask_ml.model_selection import train_test_split
from distributed import wait



## Connect to Dask Client

Connect to the client (and optionally Dashboard) to submit tasks.

In [None]:
client = dask_databricks.get_client()
client

0,1
Connection method: Cluster object,Cluster type: dask_databricks.DatabricksCluster
Dashboard: https://dbc-dp-8721196619973675.cloud.databricks.com/driver-proxy/o/8721196619973675/1031-230718-l2ubf858/8087/status,

0,1
Dashboard: https://dbc-dp-8721196619973675.cloud.databricks.com/driver-proxy/o/8721196619973675/1031-230718-l2ubf858/8087/status,Workers: 2
Total threads: 2,Total memory: 30.65 GiB

0,1
Comm: tcp://10.59.240.185:8786,Workers: 2
Dashboard: http://10.59.240.185:8087/status,Total threads: 2
Started: 15 minutes ago,Total memory: 30.65 GiB

0,1
Comm: tcp://10.59.238.201:35351,Total threads: 1
Dashboard: http://10.59.238.201:34953/status,Memory: 15.33 GiB
Nanny: tcp://10.59.238.201:36151,
Local directory: /tmp/dask-scratch-space/worker-s8wdpnav,Local directory: /tmp/dask-scratch-space/worker-s8wdpnav
GPU: Tesla T4,GPU memory: 15.00 GiB

0,1
Comm: tcp://10.59.246.244:35789,Total threads: 1
Dashboard: http://10.59.246.244:33581/status,Memory: 15.33 GiB
Nanny: tcp://10.59.246.244:35785,
Local directory: /tmp/dask-scratch-space/worker-h0n7o6rt,Local directory: /tmp/dask-scratch-space/worker-h0n7o6rt
GPU: Tesla T4,GPU memory: 15.00 GiB



## Download dataset

First we download the dataset into `/data` in current directory or Databrick File Storage (DBFS). Alternatively, you could also use cloud storage (S3, Google Cloud, Azure Data Lake). Refer to [docs](https://docs.databricks.com/en/storage/index.html#:~:text=Databricks%20uses%20cloud%20object%20storage,storage%20locations%20in%20your%20account.) for  more information

Uncomment the next three lines to upload the dataset to your chosen location. Only run ONCE!


In [None]:
# %fs mkdirs dbfs:/databricks/rapids

In [None]:
# !curl https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz --output /dbfs/databricks/rapids/HIGGS.csv.gz

In [None]:
# # unzip compressed file into .csv file
# !gunzip "/dbfs/databricks/rapids/HIGGS.csv.gz"

Next we load the data into GPUs.  Because the data is loaded multiple times during parameter tuning, we convert the original CSV file into Parquet format for better performance.  This can be easily done using delta table as shown in the next steps.

## Integrating Dask and Delta Lake

[Delta Lake](https://docs.databricks.com/en/delta/index.html) is an optimized storage layer within the Databricks lakehouse that provides a foundational platform for storing data and tables. This open-source software extends Parquet data files by incorporating a file-based transaction log to support [ACID transactions](https://docs.databricks.com/en/lakehouse/acid.html) and scalable metadata handling. 

Delta Lake is the default storage format for all operations on Databricks, i.e (unless otherwise specified, all tables on Databricks are Delta tables). 
Check out [tutorial](https://docs.databricks.com/en/delta/tutorial.html) for examples with basic Delta Lake operations.

Let's explore step-by-step how we can leverage Data Lake tables with dask to accelerate data pre-processing with RAPIDS.

## Read from Delta table with Dask

With Dask's [**dask-deltatable**](https://github.com/dask-contrib/dask-deltatable/tree/main), we can write the `.csv` file into a Delta table then read and parallelize with Dask. 

Calling `dask_deltalake.read_deltalake()` will return a `dask dataframe`. However, our objective is to utilize GPU acceleration for the entire ML pipeline, including data processing, model training and inference. For this reason, we will read the dask dataframe into a `cUDF dask-dataframe`  using `dask_cudf.from_dask_dataframe()`

In [None]:
def write_read_deltatable_with_dask(csv_path, delta_path):
    """
    1. Write csv file into a Delta table
    2. Read from the Delta table using Dask
    3. Convert returned dask dataframe to Dask cuDF DataFrame.

    Parameters:
    - csv_path: The DBFS path to HIGGS.csv dataset.
    - delta_path : the delta lake path to write the dataset

    Returns:
    - ddf: cuDF Dask DataFrame representing the data from the Delta table.
    """
    # Write csv file to Delta table
    data = dd.read_csv(csv_path, header=0)
    ddt.to_deltalake(delta_path, data)

    # Read from Delta table using Dask, returns dask dataframe
    df = ddt.read_deltalake(delta_path)

    # Convert Dask DataFrame to Dask cuDF for GPU acceleration
    ddf = dask_cudf.from_dask_dataframe(df)

    return ddf

In [None]:
pip list

In [None]:
csv_path = "/dbfs/databricks/rapids/HIGGS.csv"
delta_path = "/dbfs/databricks/rapids/higgs_table"

ddf = write_read_deltatable_with_dask(csv_path=csv_path, delta_path=delta_path)
ddf

[0;31m---------------------------------------------------------------------------[0m
[0;31mOSError[0m                                   Traceback (most recent call last)
File [0;32m<command-2663225126019140>:4[0m
[1;32m      1[0m csv_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/dbfs/databricks/rapids/HIGGS.csv[39m[38;5;124m"[39m
[1;32m      2[0m delta_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/dbfs/databricks/rapids/higgs_table[39m[38;5;124m"[39m
[0;32m----> 4[0m ddf [38;5;241m=[39m write_read_deltatable_with_dask(csv_path[38;5;241m=[39mcsv_path, delta_path[38;5;241m=[39mdelta_path)
[1;32m      5[0m ddf

File [0;32m<command-4456799276724339>:16[0m, in [0;36mwrite_read_deltatable_with_dask[0;34m(csv_path, delta_path)[0m
[1;32m     14[0m [38;5;66;03m# Write csv file to Delta table [39;00m
[1;32m     15[0m data [38;5;241m=[39m dd[38;5;241m.[39mread_csv(csv_path, header[38;5;241m=[39m[38;5;241m0[39m)
[0;32m---> 16[0m [43mddt[

In [None]:
ddf.shape

In [None]:
colnames = ["partition"] + ["label"] + ["feature-%02d" % i for i in range(1, 29)]
ddf.columns = colnames
ddf.head()

## Split data

In the preceding step, we used `dask-cudf` for loading data from the Delta table's, now use `train_test_split()` function from `dask-ml` to split up the dataset. 

Most of the time, the GPU backend of dask works seamlessly with utilities in `dask-ml` and we can accelerate the entire ML pipeline as such: 


In [None]:
def load_higgs(
    ddf,
) -> Tuple[
    dask_cudf.core.DataFrame,
    dask_cudf.core.Series,
    dask_cudf.core.DataFrame,
    dask_cudf.core.Series,
]:
    y = ddf["label"]
    X = ddf[ddf.columns.difference(["label"])]

    X_train, X_valid, y_train, y_valid = train_test_split(
        X, y, test_size=0.33, random_state=42
    )
    X_train, X_valid, y_train, y_valid = client.persist(
        [X_train, X_valid, y_train, y_valid]
    )
    wait([X_train, X_valid, y_train, y_valid])

    return X_train, X_valid, y_train, y_valid

In [None]:
X_train, X_valid, y_train, y_valid = load_higgs(ddf)

In [None]:
X_train.head()

In [None]:
y_train.head()

## Model training

There are two things to notice here.  Firstly, we specify the number of rounds to trigger early stopping for training.  XGBoost will stop the training process once the validation metric fails to improve in consecutive X rounds, where X is the number of rounds specified for early 
stopping.  

Secondly, we use a data type called `DaskDeviceQuantileDMatrix` for training but `DaskDMatrix` for validation.  `DaskDeviceQuantileDMatrix` is a drop-in replacement of `DaskDMatrix` for GPU-based training inputs that avoids extra data copies.

In [None]:
def fit_model_es(client, X, y, X_valid, y_valid) -> dxgb.Booster:
    early_stopping_rounds = 5
    Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
    Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
    # train the model
    booster = dxgb.train(
        client,
        {
            "objective": "binary:logistic",
            "eval_metric": "error",
            "tree_method": "gpu_hist",
        },
        Xy,
        evals=[(Xy_valid, "Valid")],
        num_boost_round=1000,
        early_stopping_rounds=early_stopping_rounds,
    )["booster"]
    return booster

In [None]:
booster = fit_model_es(client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid)
booster

## Train with Customized objective and evaluation metric

 In the example below the XGBoost model is trained using a custom logistic regression-based objective function (`logit`) and a custom evaluation metric (`error`) along with early stopping.
 
 Note that the function returns both gradient and hessian, which XGBoost uses to optimize the model.  Also, the parameter named `metric_name` needs to be specified in our callback. It is used to inform XGBoost that the custom error function should be used for evaluating early stopping criteria.

In [None]:
def fit_model_customized_objective(client, X, y, X_valid, y_valid) -> dxgb.Booster:
    def logit(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
        predt = 1.0 / (1.0 + np.exp(-predt))
        labels = Xy.get_label()
        grad = predt - labels
        hess = predt * (1.0 - predt)
        return grad, hess

    def error(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[str, float]:
        label = Xy.get_label()
        r = np.zeros(predt.shape)
        predt = 1.0 / (1.0 + np.exp(-predt))
        gt = predt > 0.5
        r[gt] = 1 - label[gt]
        le = predt <= 0.5
        r[le] = label[le]
        return "CustomErr", float(np.average(r))

    # Use early stopping with custom objective and metric.
    early_stopping_rounds = 5
    # Specify the metric we want to use for early stopping.
    es = xgb.callback.EarlyStopping(
        rounds=early_stopping_rounds, save_best=True, metric_name="CustomErr"
    )

    Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
    Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
    booster = dxgb.train(
        client,
        {"eval_metric": "error", "tree_method": "gpu_hist"},
        Xy,
        evals=[(Xy_valid, "Valid")],
        num_boost_round=1000,
        obj=logit,  # pass the custom objective
        feval=error,  # pass the custom metric
        callbacks=[es],
    )["booster"]
    return booster

In [None]:
booster_custom = fit_model_customized_objective(
    client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid
)
booster_custom

## Running inference

After some tuning, we arrive at the final model for performing inference on new data. 



In [None]:
def predict(client, model, X):
    predt = dxgb.predict(client, model, X)
    return predt

In [None]:
preds = predict(client, booster, X_train)
preds.head()

## Clean up

When finished, be sure to destroy your cluster to avoid incurring extra costs for idle resources. If you forget to destroy the cluster manually, it's important to note that Databricks clusters will automatically time out after a certain period (as specified during cluster creation).

In [None]:
client.close()