# K-Means Multi-Node Multi-GPU (MNMG) Demo

K-Means multi-Node multi-GPU implementation leverages Dask to spread data and computations across multiple workers. cuML uses One Process Per GPU (OPG) layout, which maps a single Dask worker to each GPU.

The main difference between cuML's MNMG implementation of k-means and the single-GPU is that the fit can be performed in parallel for each iteration, sharing only the centroids between iterations. The MNMG version also provides the same scalable k-means++ initialization algorithm as the single-GPU version.

Unlike the single-GPU implementation, The MNMG k-means API requires a Dask Dataframe or Array as input. `predict()` and `transform()` return the same type as input. The Dask cuDF Dataframe API is very similar to the Dask DataFrame API, but underlying Dataframes are cuDF, rather than Pandas. Dask cuPy arrays are also available.

For information about cuDF, refer to the [cuDF documentation](https://docs.rapids.ai/api/cudf/stable).

For additional information on cuML's k-means implementation: 
https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.cluster.KMeans.

## Imports

In [None]:
from cuml.dask.cluster.kmeans import KMeans as cuKMeans
from cuml.dask.common import to_dask_df
from cuml.dask.datasets import make_blobs
from cuml.metrics import adjusted_rand_score
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
from dask_ml.cluster import KMeans as skKMeans
import cupy as cp

## Start Dask Cluster

We can use the `LocalCUDACluster` to start a Dask cluster on a single machine with one worker mapped to each GPU. This is called one-process-per-GPU (OPG). 

In [None]:
cluster = LocalCUDACluster(threads_per_worker=1)
client = Client(cluster)

## Define Parameters

In [None]:
n_samples = 1000000
n_features = 2

n_total_partitions = len(list(client.has_what().keys()))

## Generate Data

### Device

We can generate a Dask cuPY Array of synthetic data for multiple clusters using `cuml.dask.datasets.make_blobs`.

In [None]:
X_dca, Y_dca = make_blobs(n_samples, 
                          n_features,
                          centers = 5, 
                          n_parts = n_total_partitions,
                          cluster_std=0.1, 
                          verbose=True)

### Host

We collect the Dask cuPy Array on a single node as a cuPy array. Then we transfer the cuPy array from device to host memory into a Numpy array.

In [None]:
X_cp = X_dca.compute()
X_np = cp.asnumpy(X_cp)
del X_cp

## Scikit-learn model

### Fit and predict

Since a scikit-learn equivalent to the multi-node multi-GPU K-means in cuML doesn't exist, we will use Dask-ML's implementation for comparison.

In [None]:
%%time
kmeans_sk = skKMeans(init="k-means||",
                     n_clusters=5,
                     n_jobs=-1,
                     random_state=100)

kmeans_sk.fit(X_np)

In [None]:
%%time
labels_sk = kmeans_sk.predict(X_np).compute()

## cuML Model

### Fit and predict

In [None]:
%%time
kmeans_cuml = cuKMeans(init="k-means||",
                       n_clusters=5,
                       random_state=100)

kmeans_cuml.fit(X_dca)

In [None]:
%%time
labels_cuml = kmeans_cuml.predict(X_dca).compute()

## Compare Results

In [None]:
score = adjusted_rand_score(labels_sk, labels_cuml)

In [None]:
passed = score == 1.0
print('compare kmeans: cuml vs sklearn labels_ are ' + ('equal' if passed else 'NOT equal'))