# Multi-GPU K-Means with Dask

In this notebook you will use GPU-accelerated K-means to identify population clusters in a multi-node, multi-GPU scalable way with Dask.

## Objectives

By the time you complete this notebook you will be able to:

- Use distributed, GPU-accelerated K-means with Dask

## Imports

First we import the needed modules to create a Dask cuDF cluster.

In [1]:
import subprocess
import logging

from dask.distributed import Client, wait, progress
from dask_cuda import LocalCUDACluster

After that, we create the cluster.

In [2]:
# bring IP address
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

# Dask Cluster 구성
# silence_logs=logging.ERROR : 에러 수준의 로그만 출력
cluster = LocalCUDACluster(ip=IPADDR, silence_logs=logging.ERROR)

# cluster에 연결하기 위한 Client 생성
client = Client(cluster)

2024-03-15 02:17:45,406 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2024-03-15 02:17:45,407 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2024-03-15 02:17:45,447 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2024-03-15 02:17:45,448 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2024-03-15 02:17:45,447 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2024-03-15 02:17:45,448 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2024-03-15 02:17:45,448 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2024-03-15 02:17:45,449 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


Finally, as we did before, we import CUDA context creators after setting up the cluster so they don't lock to a single device.

In [3]:
import cudf
import dask_cudf

import cuml
from cuml.dask.cluster import KMeans

## Load and Persist Data

We will begin by loading the data, The data set has the two grid coordinate columns, `easting` and `northing`, derived from the main population data set we have prepared.

In [4]:
ddf = dask_cudf.read_csv('./data/pop5x_2-07.csv', dtype=['float32', 'float32'])

Training the K-means model is very similar to both the scikit-learn version and the cuML single-GPU version--by setting up the client and importing from the `cuml.dask.cluster` module, the algorithm will automatically use the local Dask cluster we have set up.

Note that calling `.fit` triggers Dask computation.

In [5]:
%%time
dkm = KMeans(n_clusters=20)
dkm.fit(ddf)

 missing cuda symbols while dynamic loading
 missing cuda symbols while dynamic loading
 cuFile initialization failed
 cuFile initialization failed
 missing cuda symbols while dynamic loading
 cuFile initialization failed
 missing cuda symbols while dynamic loading
 cuFile initialization failed
CPU times: user 3.95 s, sys: 614 ms, total: 4.57 s
Wall time: 1min 49s


<cuml.dask.cluster.kmeans.KMeans at 0x7f1c7fc4ad90>

Once we have the fit model, we extract the cluster centers and rename the columns from their generic '0' and '1' to reflect the data on which they were trained.

In [12]:
cluster_centers = dkm.cluster_centers_
cluster_centers.columns = ddf.columns
cluster_centers.dtypes

northing    float32
easting     float32
dtype: object

In [17]:
cluster_centers.head(17)

Unnamed: 0,northing,easting
0,1467789.0,516252.9375
1,-4312184.0,611836.625
2,345116.9,394572.0
3,3611132.0,417882.0
4,-2189149.0,555707.3125
5,-3840314.0,569422.6875
6,858815.4,527184.8125
7,-691582.8,558732.3125
8,-5018054.0,644388.625
9,3020440.0,481503.0625


## Exercise: Count Members of the Southernmost Cluster

Using the `cluster_centers`, identify which cluster is the southernmost (has the lowest `northing` value) with the `nsmallest` method, then use `dkm.predict` to get labels for the data, and finally filter the labels to determine how many individuals the model estimated were in that cluster.

In [24]:
south_idx = cluster_centers.nsmallest(1, 'northing').index[0]
    # nothing 열을 기준으로 값이 가장 작은 data point의 행의 index
labels_predicted = dkm.predict(ddf)

# 레이블이 south_idx와 같은 data point의 갯수를 반환
labels_predicted[labels_predicted==south_idx].compute().shape[0]

9505217

In [14]:
# %load solutions/southernmost_cluster
south_idx = cluster_centers.nsmallest(1, 'northing').index[0]
labels_predicted = dkm.predict(ddf)
labels_predicted[labels_predicted==south_idx].compute().shape[0]


<br>
<div align="center"><h2>Please Restart the Kernel</h2></div>

In [25]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

143416234
154609008
154815520
131958178


## Next

In the next notebook, you will calculate infection risk again, this time using the powerful XGBoost algorithm.