In [1]:
# !nvidia-smi

## Single GPU

In [2]:
import cudf
from cuml.cluster import DBSCAN

# Create and populate a GPU DataFrame
gdf_float = cudf.DataFrame()
gdf_float['0'] = [1.0, 2.0, 5.0]
gdf_float['1'] = [4.0, 2.0, 1.0]
gdf_float['2'] = [4.0, 2.0, 1.0]

# Setup and fit clusters
dbscan_float = DBSCAN(eps=1.0, min_samples=1)
dbscan_float.fit(gdf_float)

print(dbscan_float.labels_)

0    0
1    1
2    2
dtype: int32


In [3]:
from joblib import dump, load
# save the model to disk
dump( dbscan_float, 'DBSCAN.model')

# to reload the model uncomment the line below
# loaded_model = load('DBSCAN.model')

['DBSCAN.model']

---
## Multi GPU

In [4]:
import cudf
import numpy as np

# Generate random data into cudf dataframe
def random_df(nrows):
    return cudf.DataFrame({
        'a': np.random.binomial(n=1, p=0.5, size=(nrows,)),
        'b': np.random.normal(size=(nrows,)),
        'c': np.random.normal(size=(nrows,)),
        'd': np.random.normal(size=(nrows,))
    })

random_df(10_000_000).to_csv('0.csv', index=False)
random_df(10_000_000).to_csv('1.csv', index=False)
random_df(10_000_000).to_csv('2.csv', index=False)


In [1]:
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
n_gpu = 8
from dask.distributed import Client
client = Client(cluster)

In [2]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:44850  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 810.03 GB


In [2]:
# Read CSV file in parallel across workers
import dask_cudf
df = dask_cudf.read_csv("./*.csv", 
                        dtype=['int32','float32','float32','float32']
                       )
df

Unnamed: 0_level_0,a,b,c,d
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int32,float32,float32,float32
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


**IMPORTANT:** for `model.fit()` X is expected to be partitioned with at least one partition on each Dask worker being used by the forest (self.workers).

If a worker has multiple data partitions, they will be concatenated before fitting, which will lead to additional memory usage. To minimize memory consumption, ensure that each worker has exactly one partition.

When persisting data, you can use `cuml.dask.common.utils.persist_across_workers` to simplify this:

In [3]:
y = df['a']
X = df.drop('a', axis=1)

X = X.repartition(npartitions=n_gpu)
y = y.repartition(npartitions=n_gpu)

from cuml.dask.common.utils import persist_across_workers
X, y = persist_across_workers(client, [X, y])

X,y

(<dask_cudf.DataFrame | 8 tasks | 8 npartitions>,
 <dask_cudf.Series | 8 tasks | 8 npartitions>)

In [4]:
# Fit a RF classifier model
from cuml.dask.ensemble import RandomForestClassifier
rf_gpu = RandomForestClassifier(client=client, 
                                ignore_empty_partitions=True)
rf_gpu.fit(X, y)



<cuml.dask.ensemble.randomforestclassifier.RandomForestClassifier at 0x7f026d673a50>