/
kmeans.py
302 lines (244 loc) · 10.1 KB
/
kmeans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from cuml.internals.memory_utils import with_cupy_rmm
from cuml.dask.common.utils import wait_and_raise_from_futures
from raft_dask.common.comms import get_raft_comm_state
from raft_dask.common.comms import Comms
from cuml.dask.common.input_utils import DistributedDataHandler
from cuml.dask.common.input_utils import concatenate
from cuml.dask.common.base import mnmg_import
from dask.distributed import get_worker
from cuml.dask.common.base import DelayedTransformMixin
from cuml.dask.common.base import DelayedPredictionMixin
from cuml.dask.common.base import BaseEstimator
from cuml.internals.safe_imports import gpu_only_import
cp = gpu_only_import("cupy")
class KMeans(BaseEstimator, DelayedPredictionMixin, DelayedTransformMixin):
"""
Multi-Node Multi-GPU implementation of KMeans.
This version minimizes data transfer by sharing only
the centroids between workers in each iteration.
Predictions are done embarrassingly parallel, using cuML's
single-GPU version.
For more information on this implementation, refer to the
documentation for single-GPU K-Means.
Parameters
----------
handle : cuml.Handle
Specifies the cuml.handle that holds internal CUDA state for
computations in this model. Most importantly, this specifies the CUDA
stream that will be used for the model's computations, so users can
run different models concurrently in different streams by creating
handles in several streams.
If it is None, a new one is created.
n_clusters : int (default = 8)
The number of centroids or clusters you want.
max_iter : int (default = 300)
The more iterations of EM, the more accurate, but slower.
tol : float (default = 1e-4)
Stopping criterion when centroid means do not change much.
verbose : int or boolean, default=False
Sets logging level. It must be one of `cuml.common.logger.level_*`.
See :ref:`verbosity-levels` for more info.
random_state : int (default = 1)
If you want results to be the same when you restart Python,
select a state.
init : {'scalable-kmeans++', 'k-means||' , 'random' or an ndarray} \
(default = 'scalable-k-means++')
'scalable-k-means++' or 'k-means||': Uses fast and stable scalable
kmeans++ initialization.
'random': Choose 'n_cluster' observations (rows) at random
from data for the initial centroids. If an ndarray is passed,
it should be of shape (n_clusters, n_features) and gives the
initial centers.
oversampling_factor : int (default = 2)
The amount of points to sample in scalable k-means++ initialization for
potential centroids. Increasing this value can lead to better initial
centroids at the cost of memory. The total number of centroids sampled
in scalable k-means++ is oversampling_factor * n_clusters * 8.
max_samples_per_batch : int (default = 32768)
The number of data samples to use for batches of the pairwise distance
computation. This computation is done throughout both fit predict.
The default should suit most cases. The total number of elements in the
batched pairwise distance computation is max_samples_per_batch
* n_clusters. It might become necessary to lower this number when
n_clusters becomes prohibitively large.
Attributes
----------
cluster_centers_ : cuDF DataFrame or CuPy ndarray
The coordinates of the final clusters. This represents of "mean" of
each data cluster.
"""
def __init__(self, *, client=None, verbose=False, **kwargs):
super().__init__(client=client, verbose=verbose, **kwargs)
@staticmethod
@mnmg_import
def _func_fit(sessionId, objs, datatype, has_weights, **kwargs):
from cuml.cluster.kmeans_mg import KMeansMG as cumlKMeans
handle = get_raft_comm_state(sessionId, get_worker())["handle"]
if not has_weights:
inp_data = concatenate(objs)
inp_weights = None
else:
inp_data = concatenate([X for X, weights in objs])
inp_weights = concatenate([weights for X, weights in objs])
return cumlKMeans(handle=handle, output_type=datatype, **kwargs).fit(
inp_data, sample_weight=inp_weights
)
@staticmethod
def _score(model, data, sample_weight=None):
ret = model.score(data, sample_weight=sample_weight)
return ret
@staticmethod
def _check_normalize_sample_weight(sample_weight):
if sample_weight is not None:
n_samples = len(sample_weight)
scale = n_samples / sample_weight.sum()
sample_weight *= scale
return sample_weight
@with_cupy_rmm
def fit(self, X, sample_weight=None):
"""
Fit a multi-node multi-GPU KMeans model
Parameters
----------
X : Dask cuDF DataFrame or CuPy backed Dask Array
Training data to cluster.
sample_weight : Dask cuDF DataFrame or CuPy backed Dask Array \
shape = (n_samples,), default=None # noqa
The weights for each observation in X. If None, all observations
are assigned equal weight.
Acceptable formats: cuDF DataFrame, NumPy ndarray, Numba device
ndarray, cuda array interface compliant array like CuPy
"""
sample_weight = self._check_normalize_sample_weight(sample_weight)
inputs = X if sample_weight is None else (X, sample_weight)
data = DistributedDataHandler.create(inputs, client=self.client)
self.datatype = data.datatype
# This needs to happen on the scheduler
comms = Comms(comms_p2p=False, client=self.client)
comms.init(workers=data.workers)
kmeans_fit = [
self.client.submit(
KMeans._func_fit,
comms.sessionId,
wf[1],
self.datatype,
data.multiple,
**self.kwargs,
workers=[wf[0]],
pure=False,
)
for idx, wf in enumerate(data.worker_to_parts.items())
]
wait_and_raise_from_futures(kmeans_fit)
comms.destroy()
self._set_internal_model(kmeans_fit[0])
return self
def fit_predict(self, X, sample_weight=None, delayed=True):
"""
Compute cluster centers and predict cluster index for each sample.
Parameters
----------
X : Dask cuDF DataFrame or CuPy backed Dask Array
Data to predict
Returns
-------
result: Dask cuDF DataFrame or CuPy backed Dask Array
Distributed object containing predictions
"""
return self.fit(X, sample_weight=sample_weight).predict(
X, sample_weight=sample_weight, delayed=delayed
)
def predict(self, X, sample_weight=None, delayed=True):
"""
Predict labels for the input
Parameters
----------
X : Dask cuDF DataFrame or CuPy backed Dask Array
Data to predict
delayed : bool (default = True)
Whether to do a lazy prediction (and return Delayed objects) or an
eagerly executed one.
Returns
-------
result: Dask cuDF DataFrame or CuPy backed Dask Array
Distributed object containing predictions
"""
sample_weight = self._check_normalize_sample_weight(sample_weight)
return self._predict(
X,
delayed=delayed,
sample_weight=sample_weight,
normalize_weights=False,
)
def fit_transform(self, X, sample_weight=None, delayed=True):
"""
Calls fit followed by transform using a distributed KMeans model
Parameters
----------
X : Dask cuDF DataFrame or CuPy backed Dask Array
Data to predict
delayed : bool (default = True)
Whether to execute as a delayed task or eager.
Returns
-------
result: Dask cuDF DataFrame or CuPy backed Dask Array
Distributed object containing the transformed data
"""
return self.fit(X, sample_weight=sample_weight).transform(
X, delayed=delayed
)
def transform(self, X, delayed=True):
"""
Transforms the input into the learned centroid space
Parameters
----------
X : Dask cuDF DataFrame or CuPy backed Dask Array
Data to predict
delayed : bool (default = True)
Whether to execute as a delayed task or eager.
Returns
-------
result: Dask cuDF DataFrame or CuPy backed Dask Array
Distributed object containing the transformed data
"""
return self._transform(X, n_dims=2, delayed=delayed)
@with_cupy_rmm
def score(self, X, sample_weight=None):
"""
Computes the inertia score for the trained KMeans centroids.
Parameters
----------
X : dask_cudf.Dataframe
Dataframe to compute score
Returns
-------
Inertial score
"""
sample_weight = self._check_normalize_sample_weight(sample_weight)
scores = self._run_parallel_func(
KMeans._score,
X,
sample_weight=sample_weight,
n_dims=1,
delayed=False,
output_futures=True,
)
return -1 * cp.sum(
cp.asarray(self.client.compute(scores, sync=True)) * -1.0
)
def get_param_names(self):
return list(self.kwargs.keys())