# KMeans

In [None]:
import numpy as np
import pandas as pd
import time

### Create synthetic dataset

In [None]:
n_rows = 100000
n_cols = 500
n_clusters_data = 200
cluster_std = 1.0
dtype='float32'
from cuml.datasets import make_blobs
data, _ = make_blobs(
        n_rows, n_cols, n_clusters_data, cluster_std=cluster_std, random_state=0, dtype=dtype
    )  # make_blobs creates a random dataset of isotropic gaussian blobs.

data = data.get()

### Convert dataset to Spark DataFrame

In [None]:
pd_data = pd.DataFrame({"features": list(data)})
df = spark.createDataFrame(pd_data)

In [None]:
df.schema

### We will use this function to build both the Spark RAPIDS ML (GPU) and Spark ML (CPU) linear estimator objects, demonstrating the common API

In [None]:
def build_kmeans_estimator(estimator_class):
    return ( 
            estimator_class()
            .setTol(1.0e-20)
            .setK(200)
            .setFeaturesCol("features")
            .setMaxIter(15)
           )

## Spark RAPIDS ML (GPU)

In [None]:
from spark_rapids_ml.clustering import KMeans
gpu_kmeans = build_kmeans_estimator(KMeans)

Estimator can be persisted and reloaded.

In [None]:
estimator_path = "/tmp/kmeans-estimator"

In [None]:
gpu_kmeans.write().overwrite().save(estimator_path)
gpu_kmeans_loaded = KMeans.load(estimator_path)

### Fit

In [None]:
start_time = time.time()
gpu_model = gpu_kmeans_loaded.fit(df)
gpu_fit = time.time() - start_time
print(f"Fit took: {gpu_fit} sec")

In [None]:
gpu_kmeans_loaded.getK()

In [None]:
sorted_clusters = sorted([vec.tolist() for vec in gpu_model.clusterCenters()])

In [None]:
[vec[0:10] for vec in sorted_clusters[0:2]]

### Transform

In [None]:
model_path = "/tmp/kmeans-model"

In [None]:
gpu_model.write().overwrite().save(model_path)

In [None]:
gpu_model_loaded = gpu_model.read().load(model_path)

In [None]:
[vec[0:10] for vec in sorted(gpu_model_loaded.cluster_centers_)[0:2]]

In [None]:
transformed_df = gpu_model_loaded.setPredictionCol("transformed").transform(df)

In [None]:
transformed_df.printSchema()

In [None]:
transformed_df.count()

In [None]:
transformed_df.show(10)

## Spark ML (CPU)

In [None]:
from pyspark.ml.clustering import KMeans
cpu_kmeans = build_kmeans_estimator(KMeans)

Convert array sql type to VectorUDT Dataframe expected by Spark ML algos (Note: Spark RAPIDS ML also accepts VectorUDT Dataframes in addition to array type Dataframe above, along with a scalar column format - see docs).

In [None]:
from pyspark.ml.functions import array_to_vector

In [None]:
vector_df = df.select(array_to_vector("features").alias("features"))

### Fit

In [None]:
start_time = time.time()
cpu_kmeans_model = cpu_kmeans.fit(vector_df)
cpu_fit = time.time() - start_time
print(f"Fit took: {cpu_fit} sec")

In [None]:
sorted_cpu_cluster_centers = sorted([vec.tolist() for vec in cpu_kmeans_model.clusterCenters()])
[vec[0:10] for vec in sorted_cpu_cluster_centers[0:2]]

### Transform

In [None]:
spark_transformed = cpu_kmeans_model.setPredictionCol("transformed").transform(vector_df)

In [None]:
spark_transformed.filter(spark_transformed.transformed >= 0).count()

In [None]:
spark_transformed.show(10)

## Pipeline: CPU MinMaxScaler + GPU KMeans

Note: cuML has a [MinMaxScaler](https://docs.rapids.ai/api/cuml/nightly/api.html#cuml.preprocessing.MinMaxScaler), but it needs to be exposed as a [Spark ML Transformer](https://spark.apache.org/docs/latest/ml-pipeline.html#transformers) (not sure of potential impact on performance).

In [None]:
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.functions import array_to_vector

from spark_rapids_ml.clustering import KMeans

In [None]:
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
gpu_kmeans = (
    KMeans()
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("scaled_features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [None]:
pipe = Pipeline(stages=[scaler, gpu_kmeans])

In [None]:
start_time = time.time()
pipe_model = pipe.fit(vector_df)
cpu_gpu_pipe = time.time() - start_time
print(f"Fit took: {cpu_gpu_pipe} sec")

In [None]:
pipe_transformed = pipe_model.transform(vector_df)
pipe_transformed.show(10)

## Pipeline: CPU MinMaxScaler + CPU KMeans

In [None]:
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.functions import array_to_vector

from pyspark.ml.clustering import KMeans

In [None]:
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
cpu_kmeans = (
    KMeans()
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("scaled_features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [None]:
pipe = Pipeline(stages=[scaler, cpu_kmeans])

In [None]:
start_time = time.time()
pipe_model = pipe.fit(vector_df)
cpu_cpu_pipe = time.time() - start_time
print(f"Fit took: {cpu_cpu_pipe} sec")

In [None]:
pipe_transformed = pipe_model.transform(vector_df)
pipe_transformed.show(10)

## Fake Pipeline (numpy)

In [1]:
import numpy as np
import pandas as pd
import time

In [2]:
n_rows = 10000
n_cols = 500
n_clusters_data = 200
cluster_std = 1.0
dtype='float32'
from cuml.datasets import make_blobs
data, _ = make_blobs(
        n_rows, n_cols, n_clusters_data, cluster_std=cluster_std, random_state=0, dtype=dtype
    )  # make_blobs creates a random dataset of isotropic gaussian blobs.

data = data.get()

In [3]:
pd_data = pd.DataFrame({"features": list(data)})
df = spark.createDataFrame(pd_data).repartition(2).cache()
df.count()

23/06/26 10:00:41 WARN TaskSetManager: Stage 0 contains a task of very large size (1298 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

10000

In [4]:
df.schema

StructType([StructField('features', ArrayType(FloatType(), True), True)])

In [5]:
def build_kmeans_estimator(estimator_class):
    return ( 
            estimator_class()
            .setTol(1.0e-20)
            .setK(200)
            .setFeaturesCol("features")
            .setMaxIter(15)
           )

In [6]:
from spark_rapids_ml.clustering import KMeans

In [7]:
gpu_kmeans = (
    KMeans(fake_pipe=True, use_comms=False, use_cupy=False)
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [8]:
start_time = time.time()
pipe_model = gpu_kmeans.fit(df)
fake_pipe_numpy = time.time() - start_time
print(f"Fit took: {fake_pipe_numpy} sec")

                                                                                

Fit took: 9.797769784927368 sec


## Fake Pipeline (cupy)

In [9]:
from spark_rapids_ml.clustering import KMeans

In [10]:
gpu_kmeans = (
    KMeans(fake_pipe=True, use_comms=False, use_cupy=True)
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [11]:
start_time = time.time()
pipe_model = gpu_kmeans.fit(df)
fake_pipe_cupy = time.time() - start_time
print(f"Fit took: {fake_pipe_cupy} sec")

[Stage 14:>                                                         (0 + 2) / 2]

Fit took: 4.041950464248657 sec


                                                                                

## Fake Pipeline (numpy, comms)

In [12]:
from spark_rapids_ml.clustering import KMeans

In [13]:
gpu_kmeans = (
    KMeans(fake_pipe=True, use_comms=True, use_nccl=False, use_cupy=False)
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [14]:
start_time = time.time()
pipe_model = gpu_kmeans.fit(df)
fake_pipe_numpy_comms = time.time() - start_time
print(f"Fit took: {fake_pipe_numpy_comms} sec")

[Stage 18:>                                                         (0 + 2) / 2]

Fit took: 5.000985622406006 sec


                                                                                

## Fake Pipeline (cupy, comms)

In [15]:
from spark_rapids_ml.clustering import KMeans

In [16]:
gpu_kmeans = (
    KMeans(fake_pipe=True, use_comms=True, use_nccl=False, use_cupy=True)
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [17]:
start_time = time.time()
pipe_model = gpu_kmeans.fit(df)
fake_pipe_cupy_comms = time.time() - start_time
print(f"Fit took: {fake_pipe_cupy_comms} sec")

[Stage 22:>                                                         (0 + 2) / 2]

Fit took: 4.967763185501099 sec


                                                                                

## Fake Pipeline (numpy, comms, nccl)

In [18]:
from spark_rapids_ml.clustering import KMeans

In [19]:
gpu_kmeans = (
    KMeans(fake_pipe=True, use_comms=True, use_nccl=True, use_cupy=False)
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [20]:
start_time = time.time()
pipe_model = gpu_kmeans.fit(df)
fake_pipe_numpy_nccl = time.time() - start_time
print(f"Fit took: {fake_pipe_numpy_nccl} sec")

[Stage 26:>                                                         (0 + 2) / 2]

Fit took: 4.081805944442749 sec


                                                                                

## Fake Pipeline (cupy, comms, nccl)

In [21]:
from spark_rapids_ml.clustering import KMeans

In [22]:
gpu_kmeans = (
    KMeans(fake_pipe=True, use_comms=True, use_nccl=True, use_cupy=True)
    .setTol(1.0e-20)
    .setK(200)
    .setFeaturesCol("features")
    .setPredictionCol("transformed")
    .setMaxIter(15)
)

In [23]:
start_time = time.time()
pipe_model = gpu_kmeans.fit(df)
fake_pipe_cupy_nccl = time.time() - start_time
print(f"Fit took: {fake_pipe_cupy_nccl} sec")

[Stage 30:>                                                         (0 + 2) / 2]

Fit took: 4.024225473403931 sec


                                                                                

### Summary

In [None]:
print(gpu_fit)
print(cpu_fit)
print(cpu_gpu_pipe)
print(cpu_cpu_pipe)
print(fake_pipe_numpy)
print(fake_pipe_cupy)

# Scratch

In [None]:
from cuml.preprocessing import MinMaxScaler
import cupy as cp

data = [[-1.0, 2], 
        [-0.5, 6],
        [ 0.0, 10],
        [ 1.0, 18]]

data = cp.array(data)
scaler = MinMaxScaler()
print(scaler.fit(data))

In [None]:
scaler.data_max_
# [ 1. 18.]

In [None]:
scaler.data_min_
# [-1.  2.]

In [None]:
scaler.data_range_

In [None]:
scaler.feature_range

In [None]:
scaler.scale_

In [None]:
scaler.min_

In [None]:
print(scaler.transform(data))
# [[0.   0.  ]
#  [0.25 0.25]
#  [0.5  0.5 ]
#  [1.   1.  ]]

In [None]:
print(scaler.transform(cp.array([[2, 2]])))
# [[1.5 0. ]]

## Manually calculate scale_ and min_

In [None]:
scale_ = (scaler.feature_range[1] - scaler.feature_range[0]) / (scaler.data_max_ - scaler.data_min_)
scale_

In [None]:
min_ = scaler.feature_range[0] - scaler.data_min_ * scale_
min_

## New Scaler

In [None]:
data = cp.array([[20., 20.],
                 [-20., -20.]])
new_scaler = MinMaxScaler()
new_scaler.fit(data)

In [None]:
new_scaler.scale_
# array([0.025, 0.025])

In [None]:
new_scaler.min_
# array([0.5, 0.5])

In [None]:
new_scaler.transform(cp.array([[2, 2]]))
# array([[0.55, 0.55]])

## Modify the scaler

In [None]:
scaler.scale_ = new_scaler.scale_
scaler.min_ = new_scaler.min_

In [None]:
print(scaler.transform(cp.array([[2, 2]])))

In [None]:
scale_ = (scaler.feature_range[1] - scaler.feature_range[0]) / (scaler.data_max_ - scaler.data_min_)
scale_

### NCCL

In [None]:
from cupy.cuda import nccl

In [None]:
nccl.