In [1]:
import time
import os
import dask
import cupy
import xgboost as xgb
import dask.array as da
from distributed import Client
from dask_hip import LocalHIPCluster
from xgboost.dask import DaskDMatrix

In [2]:
def main(client):
    # generate some random data for demonstration
    n = 1_000
    m = 3_500_000
    partition_size = 10_000
    # m = n = 50_000
    # partition_size = 1_000
    rs = da.random.RandomState(RandomState=cupy.random.RandomState)
    x = rs.normal(size=(m, n), chunks=partition_size)
    y = rs.normal(size=(m,), chunks=partition_size)
 
    # DaskDMatrix acts like normal DMatrix, works as a proxy for local DMatrix scatter
    # around workers.
    dtrain = DaskDMatrix(client, x, y)

    # Use train method from xgboost.dask instead of xgboost.  This distributed version
    # of train returns a dictionary containing the resulting booster and evaluation
    # history obtained from evaluation metrics.
    start=time.time()
    output = xgb.dask.train(
        client,
        {
        "verbosity": 2,
        "tree_method": "hist",
        "device": "gpu",
                },
        dtrain,
        num_boost_round=8,
        evals=[(dtrain, "train")],
        )
    end=time.time()
    print(f'Train time: {end - start:.4f} secs')
    bst = output["booster"]
    history = output["history"]

    start = time.time()
    # you can pass output directly into `predict` too.
    prediction = xgb.dask.predict(client, bst, dtrain)
    end = time.time()
    print(f'Eval time: {end - start:.4f} secs')
    print("Evaluation history:", history)
    return prediction

In [6]:
# Set up Dask cluster & client, get dask dashboard

cluster = LocalHIPCluster(HIP_VISIBLE_DEVICES='0,1,2,3')
client = Client(cluster)
cluster

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:42837
INFO:distributed.scheduler:  dashboard at:            127.0.0.1:8787
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38555'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35387'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46221'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46631'
2023-11-09 03:52:57,616 - distributed.preloading - INFO - Creating preload: dask_hip.initialize
2023-11-09 03:52:57,616 - distributed.preloading - INFO - Import preload module: dask_hip.initialize
2023-11-09 03:52:57,649 - distributed.preloading - INFO - Creating preload: dask_hip.initialize
2023-11-09 03:52:57,649 - distributed.preloading - INFO - Import preload module: dask_hip.initialize
2023-11-09 03:52:57,653 - distributed.preloading - INFO - Creating preload: dask_hip.initialize
2023-11-09 03:52:57,653 - distributed.preloading 

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 503.72 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42837,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 503.72 GiB

0,1
Comm: tcp://127.0.0.1:40835,Total threads: 1
Dashboard: http://127.0.0.1:37889/status,Memory: 125.93 GiB
Nanny: tcp://127.0.0.1:38555,
Local directory: /tmp/dask-worker-space/worker-x1kp2xt8,Local directory: /tmp/dask-worker-space/worker-x1kp2xt8
GPU: Instinct MI210,GPU memory: 63.98 GiB

0,1
Comm: tcp://127.0.0.1:41215,Total threads: 1
Dashboard: http://127.0.0.1:46683/status,Memory: 125.93 GiB
Nanny: tcp://127.0.0.1:35387,
Local directory: /tmp/dask-worker-space/worker-n32s78xl,Local directory: /tmp/dask-worker-space/worker-n32s78xl
GPU: Instinct MI210,GPU memory: 63.98 GiB

0,1
Comm: tcp://127.0.0.1:43915,Total threads: 1
Dashboard: http://127.0.0.1:39179/status,Memory: 125.93 GiB
Nanny: tcp://127.0.0.1:46221,
Local directory: /tmp/dask-worker-space/worker-85skja1z,Local directory: /tmp/dask-worker-space/worker-85skja1z
GPU: Instinct MI210,GPU memory: 63.98 GiB

0,1
Comm: tcp://127.0.0.1:35843,Total threads: 1
Dashboard: http://127.0.0.1:44143/status,Memory: 125.93 GiB
Nanny: tcp://127.0.0.1:46631,
Local directory: /tmp/dask-worker-space/worker-ce8lhn0b,Local directory: /tmp/dask-worker-space/worker-ce8lhn0b
GPU: Instinct MI210,GPU memory: 63.98 GiB


In [7]:

# execute dask client
main(client)


INFO:distributed.worker:Run out-of-band function '_start_tracker'
INFO:distributed.scheduler:Receive client connection: Client-worker-7f516713-7eb3-11ee-95d4-368d33f8b47c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39738
INFO:distributed.scheduler:Receive client connection: Client-worker-7f513e50-7eb3-11ee-95d1-368d33f8b47c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39740
INFO:distributed.scheduler:Receive client connection: Client-worker-7f518f1c-7eb3-11ee-95d7-368d33f8b47c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39746
INFO:distributed.scheduler:Receive client connection: Client-worker-7f51f9f6-7eb3-11ee-95d8-368d33f8b47c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39752
[03:53:09] task [xgboost.dask-0]:tcp://127.0.0.1:40835 got new rank 0
[03:53:09] task [xgboost.dask-1]:tcp://127.0.0.1:41215 got new rank 1
[03:53:09] task [xgboost.dask-2]:tcp://127.0.0.1:43915 got

[0]	train-rmse:0.99973
[1]	train-rmse:0.99965
[2]	train-rmse:0.99957
[3]	train-rmse:0.99949
[4]	train-rmse:0.99941
[5]	train-rmse:0.99934
[6]	train-rmse:0.99925
[7]	train-rmse:0.99917
Train time: 2.8448 secs
Eval time: 0.9371 secs
Evaluation history: {'train': OrderedDict([('rmse', [0.9997349441670412, 0.9996505397780301, 0.9995735080402275, 0.9994889373917455, 0.9994081643112612, 0.999335568966786, 0.9992539293806494, 0.9991717997763842])])}


Unnamed: 0,Array,Chunk
Bytes,13.35 MiB,39.06 kiB
Shape,"(3500000,)","(10000,)"
Dask graph,350 chunks in 701 graph layers,350 chunks in 701 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 13.35 MiB 39.06 kiB Shape (3500000,) (10000,) Dask graph 350 chunks in 701 graph layers Data type float32 numpy.ndarray",3500000  1,

Unnamed: 0,Array,Chunk
Bytes,13.35 MiB,39.06 kiB
Shape,"(3500000,)","(10000,)"
Dask graph,350 chunks in 701 graph layers,350 chunks in 701 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [8]:
# Don't forget to clean-up after run

client.shutdown()
cluster.close()


INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:38555'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:35387'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:46221'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:46631'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.scheduler:Remove client Client-worker-7f513e50-7eb3-11ee-95d1-368d33f8b47c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:39740; closing.
INFO:distributed.scheduler:Remove client Client-worker-7f516713-7eb3-11ee-95d4-368d33f8b47c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:39738; closing.
INFO:distributed.s